HBASE-26012 Improve logging and dequeue logic in DelayQueue (#3397)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
193a94e523
commit
7e6b66c867
|
@ -311,8 +311,14 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (running.get()) {
|
while (running.get()) {
|
||||||
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
|
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue,
|
||||||
|
20, TimeUnit.SECONDS);
|
||||||
if (task == null || task == DelayedUtil.DELAYED_POISON) {
|
if (task == null || task == DelayedUtil.DELAYED_POISON) {
|
||||||
|
if (task == null && queue.size() > 0) {
|
||||||
|
LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty when timed waiting"
|
||||||
|
+ " elapsed. If this is repeated consistently, it means no element is getting expired"
|
||||||
|
+ " from the queue and it might freeze the system. Queue: {}", queue);
|
||||||
|
}
|
||||||
// the executor may be shutting down, and the task is just the shutdown request
|
// the executor may be shutting down, and the task is just the shutdown request
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
import java.util.concurrent.DelayQueue;
|
import java.util.concurrent.DelayQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
|
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
|
||||||
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
|
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -52,7 +53,8 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (executor.isRunning()) {
|
while (executor.isRunning()) {
|
||||||
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
|
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue, 20,
|
||||||
|
TimeUnit.SECONDS);
|
||||||
if (task == null || task == DelayedUtil.DELAYED_POISON) {
|
if (task == null || task == DelayedUtil.DELAYED_POISON) {
|
||||||
// the executor may be shutting down,
|
// the executor may be shutting down,
|
||||||
// and the task is just the shutdown request
|
// and the task is just the shutdown request
|
||||||
|
|
|
@ -77,9 +77,10 @@ public final class DelayedUtil {
|
||||||
/**
|
/**
|
||||||
* @return null (if an interrupt) or an instance of E; resets interrupt on calling thread.
|
* @return null (if an interrupt) or an instance of E; resets interrupt on calling thread.
|
||||||
*/
|
*/
|
||||||
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) {
|
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue,
|
||||||
|
final long timeout, final TimeUnit timeUnit) {
|
||||||
try {
|
try {
|
||||||
return queue.take();
|
return queue.poll(timeout, timeUnit);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
return null;
|
return null;
|
||||||
|
|
Loading…
Reference in New Issue