diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 73f6d473c09..4c54c57b9e4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -23,25 +23,21 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; @@ -52,7 +48,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; *
  • Each server queue has a dispatch buffer
  • *
  • Once the dispatch buffer reaches a threshold-size/time we send
  • * - *

    Call {@link #start()} and then {@link #submitTask(Callable)}. When done, + *

    Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, * call {@link #stop()}. */ @InterfaceAudience.Private @@ -139,14 +135,7 @@ public abstract class RemoteProcedureDispatcher submitTask(Callable task) { - return threadPool.submit(task); + protected final void submitTask(Runnable task) { + threadPool.execute(task); } - protected Future submitTask(Callable task, long delay, TimeUnit unit) { - final FutureTask futureTask = new FutureTask(task); - timeoutExecutor.add(new DelayedTask(futureTask, delay, unit)); - return futureTask; + protected final void submitTask(Runnable task, long delay, TimeUnit unit) { + timeoutExecutor.add(new DelayedTask(task, delay, unit)); } protected abstract void remoteDispatch(TRemote key, Set operations); @@ -254,19 +241,19 @@ public abstract class RemoteProcedureDispatcher - * @param */ public interface RemoteNode { TRemote getKey(); + void add(RemoteProcedure operation); + void dispatch(); } protected ArrayListMultimap, RemoteOperation> buildAndGroupRequestByType(final TEnv env, final TRemote remote, final Set remoteProcedures) { final ArrayListMultimap, RemoteOperation> requestByType = ArrayListMultimap.create(); - for (RemoteProcedure proc: remoteProcedures) { + for (RemoteProcedure proc : remoteProcedures) { RemoteOperation operation = proc.remoteCallBuild(env, remote); requestByType.put(operation.getClass(), operation); } @@ -297,9 +284,9 @@ public abstract class RemoteProcedureDispatcher * used to submit something later to the thread-pool. */ - private static final class DelayedTask extends DelayedContainerWithTimestamp> { - public DelayedTask(final FutureTask task, final long delay, final TimeUnit unit) { + private static final class DelayedTask extends DelayedContainerWithTimestamp { + public DelayedTask(Runnable task, long delay, TimeUnit unit) { super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); } }; diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestRemoteProcedureDispatcherUncaughtExceptionHandler.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestRemoteProcedureDispatcherUncaughtExceptionHandler.java new file mode 100644 index 00000000000..7f44fc31322 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestRemoteProcedureDispatcherUncaughtExceptionHandler.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; + +/** + * Make sure the {@link UncaughtExceptionHandler} will be called when there are unchecked exceptions + * thrown in the task. + *

    + * See HBASE-21875 and HBASE-21890 for more details. + */ +@Category({ MasterTests.class, SmallTests.class }) +public class TestRemoteProcedureDispatcherUncaughtExceptionHandler { + + private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRemoteProcedureDispatcherUncaughtExceptionHandler.class); + + private static final class ExceptionHandler implements UncaughtExceptionHandler { + + private Throwable error; + + @Override + public synchronized void uncaughtException(Thread t, Throwable e) { + this.error = e; + notifyAll(); + } + + public synchronized void get() throws Throwable { + while (error == null) { + wait(); + } + throw error; + } + } + + private static final class Dispatcher extends RemoteProcedureDispatcher { + + private final UncaughtExceptionHandler handler; + + public Dispatcher(UncaughtExceptionHandler handler) { + super(UTIL.getConfiguration()); + this.handler = handler; + } + + @Override + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return handler; + } + + @Override + protected void remoteDispatch(Integer key, Set operations) { + } + + @Override + protected void abortPendingOperations(Integer key, Set operations) { + } + } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private ExceptionHandler handler; + + private Dispatcher dispatcher; + + @Before + public void setUp() { + handler = new ExceptionHandler(); + dispatcher = new Dispatcher(handler); + dispatcher.start(); + } + + @After + public void tearDown() { + dispatcher.stop(); + dispatcher = null; + handler = null; + } + + @Test + public void testSubmit() throws Throwable { + String message = "inject error"; + thrown.expect(RuntimeException.class); + thrown.expectMessage(message); + dispatcher.submitTask(new Runnable() { + + @Override + public void run() { + throw new RuntimeException(message); + } + }); + handler.get(); + } + + @Test + public void testDelayedSubmit() throws Throwable { + String message = "inject error"; + thrown.expect(RuntimeException.class); + thrown.expectMessage(message); + dispatcher.submitTask(new Runnable() { + + @Override + public void run() { + throw new RuntimeException(message); + } + }, 100, TimeUnit.MILLISECONDS); + handler.get(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index ba4737b7b22..88a4db8fb03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; @@ -81,6 +81,17 @@ public class RSProcedureDispatcher RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME); } + @Override + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e); + } + }; + } + @Override public boolean start() { if (!super.start()) { @@ -139,9 +150,7 @@ public class RSProcedureDispatcher /** * Base remote call */ - protected abstract class AbstractRSRemoteCall implements Callable { - @Override - public abstract Void call(); + protected abstract class AbstractRSRemoteCall implements Runnable { private final ServerName serverName; @@ -272,10 +281,9 @@ public class RSProcedureDispatcher } @Override - public Void call() { + public void run() { remoteCallFailed(procedureEnv, new RegionServerStoppedException("Server " + getServerName() + " is not online")); - return null; } } @@ -295,7 +303,7 @@ public class RSProcedureDispatcher } @Override - public Void call() { + public void run() { request = ExecuteProceduresRequest.newBuilder(); if (LOG.isTraceEnabled()) { LOG.trace("Building request with operations count=" + remoteProcedures.size()); @@ -312,7 +320,6 @@ public class RSProcedureDispatcher remoteCallFailed(procedureEnv, e); } } - return null; } @Override @@ -380,7 +387,7 @@ public class RSProcedureDispatcher } @Override - public Void call() { + public void run() { final OpenRegionRequest request = buildOpenRegionRequest(procedureEnv, getServerName(), operations); @@ -394,7 +401,6 @@ public class RSProcedureDispatcher remoteCallFailed(procedureEnv, e); } } - return null; } private OpenRegionResponse sendRequest(final ServerName serverName, @@ -427,7 +433,7 @@ public class RSProcedureDispatcher } @Override - public Void call() { + public void run() { final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName()); try { CloseRegionResponse response = sendRequest(getServerName(), request); @@ -440,7 +446,6 @@ public class RSProcedureDispatcher remoteCallFailed(procedureEnv, e); } } - return null; } private CloseRegionResponse sendRequest(final ServerName serverName,