HashedWheelTimer算法理解

George Varghese 和 Tony Lauck 1996 年的论文:Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility提出了一种定时轮的方式来管理和维护大量的Timer调度算法.Linux 内核中的定时器采用的就是这个方案。而Redisson的实现中会调用scheduleExpirationRenewal方法创建一个定时任务来刷新锁的过期时间,防止任务执行完毕前锁就过期释放了,其中使用的定时任务就是Netty提供的HashedWheelTimer

定时任务的本质

一个Timer本质上是这样的一个数据结构:deadline越近的任务拥有越高优先级,提供以下3中基本操作:

  1. schedule新增
  2. cancel删除
  3. expire执行到期的任务
  4. updateDeadline更行到期时间(可选)

expire通常有两种工作方式:

  1. 轮询

    每隔一个时间片就去查找哪些任务已经到期

  2. 睡眠/唤醒

    1. 不停地查找deadline最近的任务,如到期则执行;否则sleep直到其到期
    2. 在sleep期间,如果有任务被cancelschedule,则deadline最近的任务有可能改变,线程会被唤醒并重新进行1的逻辑

具体实现的数据结构可以有很多选择:(假设任务持有自己在总体任务集合中对应的节点,cancel时不需要查找的过程)

  1. 有序链表
    • schedule:O(n)
    • cancel:O(1) //双向链表的节点删除
    • expire:O(1) //不停地查看第一个就可以了
  2. 堆heap
    • schedule:O(log2N) //调整heap
    • cancel:O(log2N) //调整heap
    • expire:O(1)

原理

一个Hash Wheel Timer是一个环形结构,可以想象成时钟,分为很多格子,一个格子代表一段时间(越短Timer精度越高),并用一个List保存在该格子上到期的所有任务,同时一个指针随着时间流逝一格一格转动,并执行对应List中所有到期的任务。任务通过取模决定应该放入哪个格子。

环形结构可以根据超时时间的 hash 值(这个 hash 值实际上就是ticks & mask)将 task 分布到不同的槽位中, 当 tick 到那个槽位时, 只需要遍历那个槽位的 task 即可知道哪些任务会超时(而使用线性结构, 你每次 tick 都需要遍历所有 task), 所以, 我们任务量大的时候, 相应的增加 wheel 的 ticksPerWheel 值, 可以减少 tick 时遍历任务的个数.

结构图

img

以上图为例,假设一个格子是1秒,则整个wheel能表示的时间段为8s,假如当前指针指向2,此时需要调度一个3s后执行的任务,显然应该加入到(2+3=5)的方格中,指针再走3次就可以执行了;如果任务要在10s后执行,应该等指针走完一个round零2格再执行,因此应放入4,同时将round(1)保存到任务中。检查到期任务时应当只执行round为0的,格子上其他任务的round应减1。

效率

  • 添加任务:O(1)
  • 删除/取消任务:O(1)
  • 过期/执行任务:最差情况为O(n)->也就是当HashMap里面的元素全部hash冲突,退化为一条链表的情况。平均O(1)

槽位越多,每个槽位上的链表就越短,这里需要权衡时间与空间。

netty3.10的实现

相关参数

  • tickDuration: 每 tick 一次的时间间隔, 每 tick 一次就会到达下一个槽位
  • ticksPerWheel: 轮中的 slot 数,hash算法计算目标槽位
1
2
3
4
5
6
7
8
9
10
11
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}).
*
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}

HashedWheelBucket定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
* Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
* removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
* extra object creation is needed.
*/
private static final class HashedWheelBucket {

// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;

/**
* Add {@link HashedWheelTimeout} to this bucket.
*/
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}

/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;

// process all timeouts
while (timeout != null) {
boolean remove = false;
if (timeout.remainingRounds <= 0) {
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
remove = true;
} else if (timeout.isCancelled()) {
remove = true;
} else {
timeout.remainingRounds --;
}
// store reference to next as we may null out timeout.next in the remove block.
HashedWheelTimeout next = timeout.next;
if (remove) {
remove(timeout);
}
timeout = next;
}
}

public void remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}

if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
}

/**
* Clear this bucket and return all not expired / cancelled {@link Timeout}s.
*/
public void clearTimeouts(Set<Timeout> set) {
for (;;) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}

private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}

// null out prev and next to allow for GC.
head.next = null;
head.prev = null;
return head;
}
}
  • HashedWheelTimeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
private static final class HashedWheelTimeout implements Timeout {

private static final int ST_INIT = 0;
private static final int ST_IN_BUCKET = 1;
private static final int ST_CANCELLED = 2;
private static final int ST_EXPIRED = 3;
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

private final HashedWheelTimer timer;
private final TimerTask task;
private final long deadline;

@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
private volatile int state = ST_INIT;

// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
// HashedWheelTimeout will be added to the correct HashedWheelBucket.
long remainingRounds;

// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
// As only the workerThread will act on it there is no need for synchronization / volatile.
HashedWheelTimeout next;
HashedWheelTimeout prev;

// The bucket to which the timeout was added
HashedWheelBucket bucket;

HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}

public Timer getTimer() {
return timer;
}

public TimerTask getTask() {
return task;
}

public void cancel() {
int state = state();
if (state >= ST_CANCELLED) {
// fail fast if the task was cancelled or expired before.
return;
}
if (state != ST_IN_BUCKET && compareAndSetState(ST_INIT, ST_CANCELLED)) {
// Was cancelled before the HashedWheelTimeout was added to its HashedWheelBucket.
// In this case we can just return here as it will be discarded by the WorkerThread when handling
// the adding of HashedWheelTimeout to the HashedWheelBuckets.
return;
}
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_IN_BUCKET, ST_CANCELLED)) {
return;
}
// Add the HashedWheelTimeout back to the timeouts queue so it will be picked up on the next tick
// and remove this HashedTimeTask from the HashedWheelBucket. After this is done it is ready to get
// GC'ed once the user has no reference to it anymore.
timer.timeouts.add(this);
}

public void remove() {
if (bucket != null) {
bucket.remove(this);
}
}

public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}

public int state() {
return state;
}

public boolean isCancelled() {
return state == ST_CANCELLED;
}

public boolean isExpired() {
return state > ST_IN_BUCKET;
}

public HashedWheelTimeout value() {
return this;
}

public void expire() {
if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) {
assert state() != ST_INIT;
return;
}

try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}

@Override
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;

StringBuilder buf = new StringBuilder(192);
buf.append(getClass().getSimpleName());
buf.append('(');

buf.append("deadline: ");
if (remaining > 0) {
buf.append(remaining);
buf.append(" ns later");
} else if (remaining < 0) {
buf.append(-remaining);
buf.append(" ns ago");
} else {
buf.append("now");
}

if (isCancelled()) {
buf.append(", cancelled");
}

buf.append(", task: ");
buf.append(getTask());

return buf.append(')').toString();
}
}

HashedWheelBucket创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}

ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}

private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}

timeouts队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
start();

// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}

Worker

HashedWheelTimer的核心,主要处理tick的转动、过期任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

private long tick;

public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}

// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();

do {
final long deadline = waitForNextTick();
if (deadline > 0) {
transferTimeoutsToBuckets();
HashedWheelBucket bucket =
wheel[(int) (tick & mask)];
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
unprocessedTimeouts.add(timeout);
}
}

private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
|| !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
// Was cancelled in the meantime. So just remove it and continue with next HashedWheelTimeout
// in the queue
timeout.remove();
continue;
}
long calculated = timeout.deadline / tickDuration;
long remainingRounds = (calculated - tick) / wheel.length;
timeout.remainingRounds = remainingRounds;

final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);

HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);

for (;;) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}

// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (DetectionUtil.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}

try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}

public Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
  • 定位槽位
1
2
3
HashedWheelBucket[] wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
HashedWheelBucket bucket = wheel[(int) (tick & mask)];

比如有16个槽,则mask为15,假设当前tick=30,则槽位=14

  • 更新该槽位任务的remainingRounds
    每走一个tick都要更新该tick对应的槽位下面的任务的remainingRounds或者执行到期的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
bucket.expireTimeouts(deadline);

public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;

// process all timeouts
while (timeout != null) {
boolean remove = false;
if (timeout.remainingRounds <= 0) {
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
remove = true;
} else if (timeout.isCancelled()) {
remove = true;
} else {
timeout.remainingRounds --;
}
// store reference to next as we may null out timeout.next in the remove block.
HashedWheelTimeout next = timeout.next;
if (remove) {
remove(timeout);
}
timeout = next;
}
}

执行到期任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void expire() {
if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) {
assert state() != ST_INIT;
return;
}

try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}

注意,这里是同步执行,会阻塞整个timer的,需要异步。

  • transfer
    每走一个tick的时候,要把task从queue中取出来,放到槽位。
1
2
3
4
5
6
7
8
9
long calculated = timeout.deadline / tickDuration;
long remainingRounds = (calculated - tick) / wheel.length;
timeout.remainingRounds = remainingRounds;

final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);

HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);

使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* tickDuration: 每 tick 一次的时间间隔, 每 tick 一次就会到达下一个槽位
* ticksPerWheel: 轮中的 slot 数
*/
@Test
public void testHashedWheelTimer() throws InterruptedException {
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(1000/**tickDuration**/, TimeUnit.MILLISECONDS, 16 /**ticksPerWheel**/);
System.out.println(LocalTime.now()+" submitted");
Timeout timeout = hashedWheelTimer.newTimeout((t) -> {
new Thread(){
@Override
public void run() {
System.out.println(new Date() + " executed");
System.out.println(hashedWheelTimer);
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new Date() + " FINISH");
}
}.start();
}, 5, TimeUnit.SECONDS);

hashedWheelTimer.newTimeout((t) -> {
new Thread(){
@Override
public void run() {
System.out.println(new Date() + " TASK2 executed");
System.out.println(hashedWheelTimer);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new Date() + " TASK2 FINISH");
}
}.start();
}, 15, TimeUnit.SECONDS);

TimeUnit.SECONDS.sleep(500);
}

总结

总体来说,HashedWheelTimer使用的是一个比较朴素的算法,要点有两个:

  1. 添加定时任务
    1. 如果worker线程没有执行则启动worker线程。
    2. 将定时任务task包装成HashedWheelTimeout,然后添加到Queue<HashedWheelTimeout> timeouts队列中
  2. worker线程的执行
    1. 调用waitForNextTick方法等待直到下一个tick
    2. 调用processCancelledTasks方法处理被取消的任务。从Queue<HashedWheelTimeout> cancelledTimeouts队列(调用cancel方法取消任务时会将任务添加到该队列中)中取出被取消的任务,然后将其从格子的任务列表中移除。
    3. 计算当前tick所在的格子(bucket)
    4. 调用transferTimeoutsToBuckets方法将timeouts队列中新建的任务转移到所在格子的链表中
    5. 调用HashedWheelBucket.expireTimeouts方法执行到期的任务

这里有几个值的注意的数据结构:

  1. 任务并不是直接放在格子中的,而是维护了一个双向链表,这种数据结构非常便于插入和移除。
  2. 新添加的任务并不直接放入格子,而是先放入一个队列中,这是为了避免多线程插入任务的冲突。在每个tick运行任务之前由worker线程自动对任务进行归集和分类,插入到对应的槽位里面。

相关链接

QA

  • Dubbo在2.7x中也实现了HashedWheelTimer,为什么不直接使用netty的实现

​ Dubbo在2.7.x版本中实现了自己的HashedWheelTimer,而没有直接使用Netty的实现,主要是为了满足特定的定制化需求,减少对外部依赖的管理,优化性能,确保控制权和可维护性,以及保证在不同环境中的一致性和兼容性。这使得Dubbo能够更好地适应其分布式服务框架的要求。