HashedWheelTimer时间轮源码学习 HashedWheelTimer时间轮的简介
HashedWheelTimer是Netty中的一个基础工具类,主要用来高效处理大量定时任务,且任务对时间精度要求相对不高, 比如链接超时管理等场景, 缺点是, 内存占用相对较高。但是在使用时要注意任务里不要有太耗时的操作, 否则会阻塞Worker线程, 导致tick不准
HashedWheelTimer主要还是一个DelayQueue和一个时间轮算法组合
如下图,可以看到HashedWheelTimer是由一个环形链表及数组构成
如下图,可以解释为什么在使用HashedWheelTimer不能有太耗时的操作,因为worker的执行时,任务是串行的
如下图,可以看到HashedWheelTimer是由HashedWheelBucket数组, HashedWheelTimeout链表和工作线程Worker组成,所以我们的源码分析也主要从这几个类入手
源码解析 HashedWheelTimer中的基本字段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger(); private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean(); private static final int INSTANCE_COUNT_LIMIT = 64 ; private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1 ); private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance() .newResourceLeakDetector(HashedWheelTimer.class, 1 ); private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState" ); private final ResourceLeakTracker<HashedWheelTimer> leak; private final Worker worker = new Worker(); private final Thread workerThread; public static final int WORKER_STATE_INIT = 0 ; public static final int WORKER_STATE_STARTED = 1 ; public static final int WORKER_STATE_SHUTDOWN = 2 ; @SuppressWarnings ({ "unused" , "FieldMayBeFinal" }) private volatile int workerState; private final long tickDuration; private final HashedWheelBucket[] wheel; private final int mask; private final CountDownLatch startTimeInitialized = new CountDownLatch(1 ); private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue(); private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue(); private final AtomicLong pendingTimeouts = new AtomicLong(0 ); private final long maxPendingTimeouts; private volatile long startTime;
从以上源码中我们可以大概了解到一个时间轮的执行依赖哪些条件,其中我们的任务都是基于Queue来实现的,但是这里我们要注意的是,这里的Queue是基于jctools中的Queue,以此得到更高的性能
mask标识符用来做位运算
通过原子类来保证并发情况下的一致性
这里我觉得值得我们学习的地方,是此处引用了资源泄露检测器,当资源超过64的时候就会进行告警,在细节方面netty考虑的非常全面,这个也是我们在平时编码的时需要学习的
HashedWheelTimer构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public HashedWheelTimer ( ThreadFactory threadFactory, // 用来创建worker线程 long tickDuration,// tick的时长,也就是指针多久转一格 TimeUnit unit, // tickDuration的时间单位 int ticksPerWheel, // 一圈有几格 boolean leakDetection, // 是否开启内存泄露检测 long maxPendingTimeouts //最大挂起超时次数 ) { if (threadFactory == null ) { throw new NullPointerException("threadFactory" ); } if (unit == null ) { throw new NullPointerException("unit" ); } if (tickDuration <= 0 ) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0 ) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } wheel = createWheel(ticksPerWheel); mask = wheel.length - 1 ; long duration = unit.toNanos(tickDuration); if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d" , tickDuration, Long.MAX_VALUE / wheel.length)); } if (duration < MILLISECOND_NANOS) { if (logger.isWarnEnabled()) { logger.warn("Configured tickDuration %d smaller then %d, using 1ms." , tickDuration, MILLISECOND_NANOS); } this .tickDuration = MILLISECOND_NANOS; } else { this .tickDuration = duration; } workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this ) : null ; this .maxPendingTimeouts = maxPendingTimeouts; if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false , true )) { reportTooManyInstances(); } }
这里要注意,如果ticksPerWheel的默认值是512
HashedWheelTimer其实最终都是转换成纳秒处理的
HashedWheelTimer的createWheel方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0 ) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824 ) { throw new IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0 ; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; }
这里要注意,创建时间轮数组的时候,最大长度不能超过2的30次方
HashedWheelTimer的normalizeTicksPerWheel方法 1 2 3 4 5 6 7 8 private static int normalizeTicksPerWheel (int ticksPerWheel) { int normalizedTicksPerWheel = 1 ; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1 ; } return normalizedTicksPerWheel; }
这里通过位运算来初始化每个轮盘的刻度
但这里有个问题,如果轮盘大小指定过大,这里的循环次数也会更多,性能会存在问题,此处可以进行优化[jdk1.8 hashmap的hash算法,后面深入了解下]
HashedWheelTimer的start方法(时间轮启动的方法) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void start () { switch (WORKER_STATE_UPDATER.get(this )) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this , WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break ; case WORKER_STATE_STARTED: break ; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped" ); default : throw new Error("Invalid WorkerState" ); } while (startTime == 0 ) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { } } }
HashedWheelTimer的stop方法(时间轮停止的方法) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public Set<Timeout> stop () { if (Thread.currentThread() == workerThread) { throw new IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } if (!WORKER_STATE_UPDATER.compareAndSet(this , WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { if (WORKER_STATE_UPDATER.getAndSet(this , WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); if (leak != null ) { boolean closed = leak.close(this ); assert closed; } } return Collections.emptySet(); } try { boolean interrupted = false ; while (workerThread.isAlive()) { workerThread.interrupt(); try { workerThread.join(100 ); } catch (InterruptedException ignored) { interrupted = true ; } } if (interrupted) { Thread.currentThread().interrupt(); } } finally { INSTANCE_COUNTER.decrementAndGet(); if (leak != null ) { boolean closed = leak.close(this ); assert closed; } } return worker.unprocessedTimeouts(); }
HashedWheelTimer的newTimeout方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public Timeout newTimeout (TimerTask task, long delay, TimeUnit unit) { if (task == null ) { throw new NullPointerException("task" ); } if (unit == null ) { throw new NullPointerException("unit" ); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")" ); } start(); long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; if (delay > 0 && deadline < 0 ) { deadline = Long.MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this , task, deadline); timeouts.add(timeout); return timeout; }
Worker类
Worker是时间轮的核心线程类。tick的转动,过期任务的处理都是在这个线程中处理的。我们可以看到Worker实现Runnable接口,也就意味着我们的时间轮中是由worker来创建线程并执行任务
1 2 3 4 5 6 private final class Worker implements Runnable { private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); private long tick; }
Worker类中的run方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public void run () { startTime = System.nanoTime(); if (startTime == 0 ) { startTime = 1 ; } startTimeInitialized.countDown(); do { final long deadline = waitForNextTick(); if (deadline > 0 ) { int idx = (int ) (tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); System.out.println("bucket" +bucket+",idx" +idx); bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this ) == WORKER_STATE_STARTED); for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null ) { break ; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }
Worker类中的transferTimeoutsToBuckets方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void transferTimeoutsToBuckets () { for (int i = 0 ; i < 100000 ; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null ) { break ; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { continue ; } long calculated = timeout.deadline / tickDuration; timeout.remainingRounds = (calculated - tick) / wheel.length; final long ticks = Math.max(calculated, tick); System.out.println("tick:" +ticks); int stopIndex = (int ) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout); } }b