HBASE-26012 Improve logging and dequeue logic in DelayQueue (#3397)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Viraj Jasani 2021-06-24 15:27:11 +05:30 committed by GitHub
parent dcd0fb8b1d
commit d11dc81721
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 4 deletions

View File

@ -310,8 +310,14 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
@Override
public void run() {
while (running.get()) {
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue,
20, TimeUnit.SECONDS);
if (task == null || task == DelayedUtil.DELAYED_POISON) {
if (task == null && queue.size() > 0) {
LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty when timed waiting"
+ " elapsed. If this is repeated consistently, it means no element is getting expired"
+ " from the queue and it might freeze the system. Queue: {}", queue);
}
// the executor may be shutting down, and the task is just the shutdown request
continue;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.procedure2;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
import org.apache.yetus.audience.InterfaceAudience;
@ -52,7 +53,8 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
@Override
public void run() {
while (executor.isRunning()) {
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue, 20,
TimeUnit.SECONDS);
if (task == null || task == DelayedUtil.DELAYED_POISON) {
// the executor may be shutting down,
// and the task is just the shutdown request

View File

@ -77,9 +77,10 @@ public final class DelayedUtil {
/**
* @return null (if an interrupt) or an instance of E; resets interrupt on calling thread.
*/
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) {
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue,
final long timeout, final TimeUnit timeUnit) {
try {
return queue.take();
return queue.poll(timeout, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;