Conversation
| this.checker = builder.checker; | ||
| this.recoveryTasks = new ConcurrentLinkedQueue<>(); | ||
| this.runningRecoveryTasks = new ConcurrentHashMap<>(); | ||
| this.recoveryFuture = builder.backOff == null ? initDefaultChecker(builder) : initBackOffChecker(builder); |
There was a problem hiding this comment.
defaultChecker是不是也可以用backoff实现?
There was a problem hiding this comment.
可以的,FixedBackOff 可以实现与当前一致的效果,但是考虑到 backoff 的实现还没有经过验证,所以没有直接替换 defaultChecker。
我建议先在几个客户端中手动替换掉(比如 HttpCall),稳定之后在下个版本再替换
|
|
||
| private CloseableSupplier<ScheduledFuture<?>> initBackOffChecker(WeightFailoverBuilder<T> builder) { | ||
| return lazy(() -> SharedCheckExecutorHolder.getInstance().scheduleWithFixedDelay(() -> { | ||
| if (closed) { |
There was a problem hiding this comment.
整个runnable需要做try/catch保护,避免没处理的意外导致可用性下降;
| runningRecoveryTasks.put(obj, task); | ||
| // 与外层共用一个线程池,貌似也没什么不妥... | ||
| ListenableFuture<Integer> future = | ||
| SharedCheckExecutorHolder.getInstance().schedule(task, task.nextBackOff(), MILLISECONDS); |
There was a problem hiding this comment.
schedule永远不会上抛异常吧?这里addCallback才能remove runningRecoveryTasks,是否要保护下异常分支?
| T obj = task.getObj(); | ||
| // 新建的 task 将其放入 running 队列中 | ||
| runningRecoveryTasks.put(obj, task); | ||
| // 与外层共用一个线程池,貌似也没什么不妥... |
There was a problem hiding this comment.
还有问题,这里调度任务是有可能有业务自定制的堵塞逻辑,scheduled线程池通常不多,所以这里需要设计调度线程池和检查线程池隔离,进一步,执行线程池的大小也需要确认,因为风险很大。
There was a problem hiding this comment.
您好 如果是定时调度线程池的话 咱觉着是不是可以参考一些咱正在进行的PR,里面有个FutureUtils会比较合适点
apache/pulsar#10544
There was a problem hiding this comment.
@PhantomThief 其实pulsar的BrokerService相关设计也很优雅 可以参考看下相关的调度线程池实现
No description provided.