/** * Creates a new timer with the default thread factory * ({@link Executors#defaultThreadFactory()}). * * @param tickDuration the duration between tick * @param unit the time unit of the {@code tickDuration} * @param ticksPerWheel the size of the wheel */ publicHashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel){ this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel); }
/** * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no * extra object creation is needed. */ privatestatic final classHashedWheelBucket {
// Used for the linked-list datastructure private HashedWheelTimeout head; private HashedWheelTimeout tail;
/** * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. */ publicvoidexpireTimeouts(long deadline) { HashedWheelTimeout timeout = head;
// process all timeouts while (timeout != null) { boolean remove = false; if (timeout.remainingRounds <= 0) { if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. thrownew IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } remove = true; } elseif (timeout.isCancelled()) { remove = true; } else { timeout.remainingRounds --; } // store reference to next as we may null out timeout.next in the remove block. HashedWheelTimeout next = timeout.next; if (remove) { remove(timeout); } timeout = next; } }
publicvoidremove(HashedWheelTimeout timeout) { HashedWheelTimeout next = timeout.next; // remove timeout that was either processed or cancelled by updating the linked-list if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; }
if (timeout == head) { // if timeout is also the tail we need to adjust the entry too if (timeout == tail) { tail = null; head = null; } else { head = next; } } elseif (timeout == tail) { // if the timeout is the tail modify the tail to be the prev node. tail = timeout.prev; } // null out prev, next and bucket to allow for GC. timeout.prev = null; timeout.next = null; timeout.bucket = null; }
/** * Clear this bucket and return all not expired / cancelled {@link Timeout}s. */ publicvoidclearTimeouts(Set<Timeout> set) { for (;;) { HashedWheelTimeout timeout = pollTimeout(); if (timeout == null) { return; } if (timeout.isExpired() || timeout.isCancelled()) { continue; } set.add(timeout); } }
private HashedWheelTimeout pollTimeout() { HashedWheelTimeout head = this.head; if (head == null) { returnnull; } HashedWheelTimeout next = head.next; if (next == null) { tail = this.head = null; } else { this.head = next; next.prev = null; }
// null out prev and next to allow for GC. head.next = null; head.prev = null; return head; } }
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) privatevolatileint state = ST_INIT;
// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the // HashedWheelTimeout will be added to the correct HashedWheelBucket. long remainingRounds;
// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list. // As only the workerThread will act on it there is no need for synchronization / volatile. HashedWheelTimeout next; HashedWheelTimeout prev;
// The bucket to which the timeout was added HashedWheelBucket bucket;
publicvoid cancel() { int state = state(); if (state >= ST_CANCELLED) { // fail fast if the task was cancelled or expired before. return; } if (state != ST_IN_BUCKET && compareAndSetState(ST_INIT, ST_CANCELLED)) { // Was cancelled before the HashedWheelTimeout was added to its HashedWheelBucket. // In this case we can just return here as it will be discarded by the WorkerThread when handling // the adding of HashedWheelTimeout to the HashedWheelBuckets. return; } // only update the state it will be removed from HashedWheelBucket on next tick. if (!compareAndSetState(ST_IN_BUCKET, ST_CANCELLED)) { return; } // Add the HashedWheelTimeout back to the timeouts queue so it will be picked up on the next tick // and remove this HashedTimeTask from the HashedWheelBucket. After this is done it is ready to get // GC'ed once the user has no reference to it anymore. timer.timeouts.add(this); }
privatestatic HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { thrownew IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { thrownew IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); }
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; }
privatestaticintnormalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; }
timeouts队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();
public Timeout newTimeout(TimerTask task, longdelay, TimeUnit unit) { if (task == null) { throw newNullPointerException("task"); } if (unit == null) { throw newNullPointerException("unit"); } start();
// Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; HashedWheelTimeout timeout = newHashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
publicvoidrun() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; }
// Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown();
do { finallongdeadline= waitForNextTick(); if (deadline > 0) { transferTimeoutsToBuckets(); HashedWheelBucketbucket= wheel[(int) (tick & mask)]; bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeouttimeout= timeouts.poll(); if (timeout == null) { break; } unprocessedTimeouts.add(timeout); } }
privatevoidtransferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. for (inti=0; i < 100000; i++) { HashedWheelTimeouttimeout= timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED || !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) { // Was cancelled in the meantime. So just remove it and continue with next HashedWheelTimeout // in the queue timeout.remove(); continue; } longcalculated= timeout.deadline / tickDuration; longremainingRounds= (calculated - tick) / wheel.length; timeout.remainingRounds = remainingRounds;
finallongticks= Math.max(calculated, tick); // Ensure we don't schedule for past. intstopIndex= (int) (ticks & mask);
HashedWheelBucketbucket= wheel[stopIndex]; bucket.addTimeout(timeout); } } /** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. * @return Long.MIN_VALUE if received a shutdown request, * current time otherwise (with Long.MIN_VALUE changed by +1) */ privatelongwaitForNextTick() { longdeadline= tickDuration * (tick + 1);
if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } }
// Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (DetectionUtil.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; }
// process all timeouts while (timeout != null) { boolean remove = false; if (timeout.remainingRounds <= 0) { if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. thrownewIllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } remove = true; } elseif (timeout.isCancelled()) { remove = true; } else { timeout.remainingRounds --; } // store reference to next as we may null out timeout.next in the remove block. HashedWheelTimeout next = timeout.next; if (remove) { remove(timeout); } timeout = next; } }