BlazePool.java

/*
 * Copyright © 2011-2019 Chris Vest (mr.chrisvest@gmail.com)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package stormpot;

import java.util.Objects;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;

/**
 * BlazePool is a highly optimised {@link Pool} implementation that consists of
 * a queues of Poolable instances, the access to which is made faster with
 * clever use of ThreadLocals.
 *
 * Object allocation always happens in a dedicated thread, off-loading the
 * cost of allocating the pooled objects. This leads to reduced deviation
 * in the times it takes claim method to complete, provided the pool is not
 * depleted.
 *
 * BlazePool optimises for the case where the same threads need to claim and
 * release objects over and over again. On the other hand, if the releasing
 * thread tends to differ from the claiming thread, then the major optimisation
 * in BlazePool is defeated, and performance regresses to a slow-path that is
 * limited by contention on a blocking queue.
 *
 * @author Chris Vest <mr.chrisvest@gmail.com>
 * @param <T> The type of {@link Poolable} managed by this pool.
 */
final class BlazePool<T extends Poolable>
    extends Pool<T> implements ManagedPool {

  private static final Exception SHUTDOWN_POISON =
      new Exception("Stormpot Poison: Shutdown");
  static final Exception EXPLICIT_EXPIRE_POISON =
      new Exception("Stormpot Poison: Expired");

  private final LinkedTransferQueue<BSlot<T>> live;
  private final RefillPile<T> disregardPile;
  private final RefillPile<T> newAllocations;
  private final AllocationController<T> allocator;
  private final ThreadLocal<BSlotCache<T>> tlr;
  private final Expiration<? super T> deallocRule;
  private final MetricsRecorder metricsRecorder;

  /**
   * A special slot used to signal that the pool has been shut down.
   */
  private final BSlot<T> poisonPill;

  private volatile boolean shutdown;

  /**
   * Construct a new BlazePool instance based on the given {@link PoolBuilder}.
   * @param builder The pool configuration to use.
   */
  BlazePool(PoolBuilder<T> builder, AllocationProcess factory) {
    live = new LinkedTransferQueue<>();
    disregardPile = new RefillPile<>(live);
    newAllocations = new RefillPile<>(live);
    tlr = new ThreadLocalBSlotCache<>();
    poisonPill = new BSlot<>(live, null);
    poisonPill.poison = SHUTDOWN_POISON;
    deallocRule = builder.getExpiration();
    metricsRecorder = builder.getMetricsRecorder();
    allocator = factory.buildAllocationController(
        live, disregardPile, newAllocations, builder, poisonPill);
  }

  @Override
  public T claim(Timeout timeout)
      throws PoolException, InterruptedException {
    return tlrClaim(timeout, tlr.get());
  }

  T tlrClaim(Timeout timeout, BSlotCache<T> cache)
      throws PoolException, InterruptedException {
    Objects.requireNonNull(timeout, "Timeout cannot be null.");
    BSlot<T> slot = cache.slot;
    // Note that the TLR slot at this point might have been tried by another
    // thread, found to be expired, put on the dead-queue and deallocated.
    // We handle this because slots always transition to the dead state before
    // they are put on the dead-queue, and if they are dead, then the
    // slot.live2claimTlr() call will fail.
    // Then we will eventually find another slot from the live-queue that we
    // can claim and make our new TLR slot.
    if (slot != null && slot.live2claimTlr()) {
      // Attempt the claim before checking the validity, because we might
      // already have claimed it.
      // If we checked validity before claiming, then we might find that it
      // had expired, and throw it in the dead queue, causing a claimed
      // Poolable to be deallocated before it is released.
      if (!isInvalid(slot, cache, true)) {
        slot.incrementClaims();
        return slot.obj;
      }
      // We managed to tlr-claim the slot, but it turned out to be no good.
      // That means we now have to transition it from tlr-claimed to dead.
      // However, since we didn't pull it off of the live-queue, it might still
      // be in the live-queue. And since it might be in the live-queue, it
      // can't be put on the dead-queue. And since it can't be put on the
      // dead-queue, it also cannot transition to the dead state.
      // This effectively means that we have to transition it back to the live
      // state, and then let some pull it off of the live-queue, check it
      // again, and only then put it on the dead-queue.
      // It's cumbersome, but we have to do it this way, in order to prevent
      // duplicate entries in the queues. Otherwise we'd have a nasty memory
      // leak on our hands.
    }
    // The thread-local claim failed, so we have to go through the slow-path.
    return slowClaim(timeout, cache);
  }

  private T slowClaim(Timeout timeout, BSlotCache<T> cache)
      throws PoolException, InterruptedException {
    // The slow-path for claim is in its own method to allow the fast-path to
    // inline separately. At this point, taking a performance hit is
    // inevitable anyway, so we're allowed a bit more leeway.
    BSlot<T> slot;
    long startNanos = NanoClock.nanoTime();
    long timeoutNanos = timeout.getTimeoutInBaseUnit();
    long timeoutLeft = timeoutNanos;
    TimeUnit baseUnit = timeout.getBaseUnit();
    long maxWaitQuantum = baseUnit.convert(100, TimeUnit.MILLISECONDS);
    for (;;) {
      slot = newAllocations.pop();
      if (slot == null) {
        long pollWait = Math.min(timeoutLeft, maxWaitQuantum);
        slot = live.poll(pollWait, baseUnit);
      }
      if (slot == null) {
        if (timeoutLeft <= 0) {
          // We timed out while taking from the queue - just return null
          return null;
        } else {
          timeoutLeft = NanoClock.timeoutLeft(startNanos, timeoutNanos);
          disregardPile.refill();
          continue;
        }
      }

      if (slot.live2claim()) {
        if (isInvalid(slot, cache, false)) {
          timeoutLeft = NanoClock.timeoutLeft(startNanos, timeoutNanos);
          if (timeoutLeft <= 0) {
            // There is no time left to poll the queue again - just return null
            return null;
          }
        } else {
          break;
        }
      } else {
        disregardPile.push(slot);
      }
    }
    slot.incrementClaims();
    cache.slot = slot;
    return slot.obj;
  }

  private boolean isInvalid(BSlot<T> slot, BSlotCache<T> cache, boolean isTlr) {
    if (isUncommonlyInvalid(slot)) {
      return handleUncommonInvalidation(slot, cache, isTlr);
    }

    try {
      return deallocRule.hasExpired(slot)
          && handleCommonInvalidation(slot, cache, null);
    } catch (Throwable ex) {
      return handleCommonInvalidation(slot, cache, ex);
    }
  }

  private boolean isUncommonlyInvalid(BSlot<T> slot) {
    return shutdown | slot.poison != null;
  }

  private boolean handleUncommonInvalidation(
      BSlot<T> slot, BSlotCache<T> cache, boolean isTlr) {
    Exception poison = slot.poison;
    if (poison != null) {
      return dealWithSlotPoison(slot, cache, isTlr, poison);
    } else {
      kill(slot, cache);
      throw new IllegalStateException("Pool has been shut down");
    }
  }

  private boolean handleCommonInvalidation(
      BSlot<T> slot, BSlotCache<T> cache, Throwable exception) {
    kill(slot, cache);
    if (exception != null) {
      String msg = "Got exception when checking whether an object had expired";
      throw new PoolException(msg, exception);
    }
    return true;
  }

  private boolean dealWithSlotPoison(
      BSlot<T> slot, BSlotCache<T> cache, boolean isTlr, Exception poison) {
    if (poison == SHUTDOWN_POISON) {
      // The poison pill means the pool has been shut down. The pill was
      // transitioned from live to claimed just prior to this check, so we
      // must transition it back to live and put it back into the live-queue
      // before throwing our exception.
      // Because we always throw when we see it, it will never become a
      // tlr-slot, and so we don't need to worry about transitioning from
      // tlr-claimed to live.
      slot.claim2live();
      live.offer(poisonPill);
      throw new IllegalStateException("Pool has been shut down");
    } else {
      kill(slot, cache);
      if (isTlr || poison == EXPLICIT_EXPIRE_POISON) {
        return true;
      } else {
        throw new PoolException("Allocation failed", poison);
      }
    }
  }

  private void kill(BSlot<T> slot, BSlotCache<T> cache) {
    // The use of claim2dead() here ensures that we don't put slots into the
    // dead-queue more than once. Many threads might have this as their
    // TLR-slot and try to tlr-claim it, but only when a slot has been normally
    // claimed, that is, pulled off the live-queue, can it be put into the
    // dead-queue. This helps ensure that a slot will only ever be in at most
    // one queue.
    if (slot.isClaimed()) {
      slot.claim2dead();
      allocator.offerDeadSlot(slot);
    } else {
      slot.claimTlr2live();
      cache.slot = null;
    }
  }

  @Override
  public Completion shutdown() {
    shutdown = true;
    return allocator.shutdown();
  }

  @Override
  public void setTargetSize(int size) {
    if (size < 0) {
      throw new IllegalArgumentException(
          "Target pool size must be positive");
    }
    if (shutdown) {
      return;
    }
    allocator.setTargetSize(size);
  }

  @Override
  public int getTargetSize() {
    return allocator.getTargetSize();
  }

  @Override
  public ManagedPool getManagedPool() {
    return this;
  }

  @Override
  public PoolTap<T> getThreadLocalTap() {
    return new BlazePoolThreadLocalTap<>(this);
  }

  @Override
  public long getAllocationCount() {
    return allocator.getAllocationCount();
  }

  @Override
  public long getFailedAllocationCount() {
    return allocator.getFailedAllocationCount();
  }

  @Override
  public boolean isShutDown() {
    return shutdown;
  }

  @Override
  public double getObjectLifetimePercentile(double percentile) {
    if (metricsRecorder == null) {
      return Double.NaN;
    }
    return metricsRecorder.getObjectLifetimePercentile(percentile);
  }

  @Override
  public double getAllocationLatencyPercentile(double percentile) {
    if (metricsRecorder == null) {
      return Double.NaN;
    }
    return metricsRecorder.getAllocationLatencyPercentile(percentile);
  }

  @Override
  public double getAllocationFailureLatencyPercentile(double percentile) {
    if (metricsRecorder == null) {
      return Double.NaN;
    }
    return metricsRecorder.getAllocationFailureLatencyPercentile(percentile);
  }

  @Override
  public double getReallocationLatencyPercentile(double percentile) {
    if (metricsRecorder == null) {
      return Double.NaN;
    }
    return metricsRecorder.getReallocationLatencyPercentile(percentile);
  }

  @Override
  public double getReallocationFailureLatencyPercentile(double percentile) {
    if (metricsRecorder == null) {
      return Double.NaN;
    }
    return metricsRecorder.getReallocationFailurePercentile(percentile);
  }

  @Override
  public double getDeallocationLatencyPercentile(double percentile) {
    if (metricsRecorder == null) {
      return Double.NaN;
    }
    return metricsRecorder.getDeallocationLatencyPercentile(percentile);
  }

  @Override
  public long getLeakedObjectsCount() {
    return allocator.countLeakedObjects();
  }
}