HBASE-26012 Improve logging and dequeue logic in DelayQueue (#3397)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
3a6441c946
commit
3ecfd35f6a
|
@ -311,8 +311,14 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
|||
@Override
|
||||
public void run() {
|
||||
while (running.get()) {
|
||||
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
|
||||
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue,
|
||||
20, TimeUnit.SECONDS);
|
||||
if (task == null || task == DelayedUtil.DELAYED_POISON) {
|
||||
if (task == null && queue.size() > 0) {
|
||||
LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty when timed waiting"
|
||||
+ " elapsed. If this is repeated consistently, it means no element is getting expired"
|
||||
+ " from the queue and it might freeze the system. Queue: {}", queue);
|
||||
}
|
||||
// the executor may be shutting down, and the task is just the shutdown request
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -52,7 +53,8 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
|
|||
@Override
|
||||
public void run() {
|
||||
while (executor.isRunning()) {
|
||||
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
|
||||
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue, 20,
|
||||
TimeUnit.SECONDS);
|
||||
if (task == null || task == DelayedUtil.DELAYED_POISON) {
|
||||
// the executor may be shutting down,
|
||||
// and the task is just the shutdown request
|
||||
|
|
|
@ -77,9 +77,10 @@ public final class DelayedUtil {
|
|||
/**
|
||||
* @return null (if an interrupt) or an instance of E; resets interrupt on calling thread.
|
||||
*/
|
||||
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) {
|
||||
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue,
|
||||
final long timeout, final TimeUnit timeUnit) {
|
||||
try {
|
||||
return queue.take();
|
||||
return queue.poll(timeout, timeUnit);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
|
|
Loading…
Reference in New Issue