diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java index 126571b6766..5402a0e7db5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -149,7 +149,7 @@ public final class ClientExceptionsUtil { || e instanceof ClosedChannelException || e instanceof SyncFailedException || e instanceof EOFException || e instanceof TimeoutException || e instanceof CallTimeoutException || e instanceof ConnectionClosingException - || e instanceof FailedServerException); + || e instanceof FailedServerException || e instanceof ConnectionClosedException); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosedException.java new file mode 100644 index 00000000000..c595c1422c7 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosedException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.exceptions; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Thrown when the connection is closed + */ + +@InterfaceAudience.Public +public class ConnectionClosedException extends HBaseIOException { + + private static final long serialVersionUID = -8938225073412971497L; + + public ConnectionClosedException(String string) { + super(string); + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 974994e61c7..5e38732b8e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; +import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -176,6 +177,10 @@ class IPCUtil { } else if (exception instanceof DoNotRetryIOException) { return (IOException) new DoNotRetryIOException( "Call to " + addr + " failed on local exception: " + exception).initCause(exception); + } else if (exception instanceof ConnectionClosedException) { + return (ConnectionClosedException) exception; + } else if (exception instanceof StoppedRpcClientException) { + return (StoppedRpcClientException) exception; } else { return (IOException) new IOException( "Call to " + addr + " failed on local exception: " + exception).initCause(exception); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index f6d338b04e1..649375a89c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; +import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; @@ -207,7 +208,7 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (!id2Call.isEmpty()) { - cleanupCalls(ctx, new IOException("Connection closed")); + cleanupCalls(ctx, new ConnectionClosedException("Connection closed")); } conn.shutdown(); ctx.fireChannelInactive(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java index ad1aa69cb28..ceb3f35c0c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.security; +import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; @@ -29,7 +30,6 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; -import java.io.IOException; /** * wrap messages with Crypto AES. @@ -91,7 +91,7 @@ public class CryptoAESWrapHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { if (!queue.isEmpty()) { - queue.releaseAndFailAll(new IOException("Connection closed")); + queue.releaseAndFailAll(new ConnectionClosedException("Connection closed")); } ctx.close(promise); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java index 8da3fdef52b..eb4f20545f7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.security; +import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; @@ -150,7 +151,7 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler< @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { saslRpcClient.dispose(); - saslPromise.tryFailure(new IOException("Connection closed")); + saslPromise.tryFailure(new ConnectionClosedException("Connection closed")); ctx.fireChannelInactive(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java index 949d8bb116d..62c127e2dfb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.security; +import javax.security.sasl.SaslClient; + +import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; +import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; @@ -26,11 +31,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.CoalescingBufferQueue; import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; -import java.io.IOException; - -import javax.security.sasl.SaslClient; - -import org.apache.yetus.audience.InterfaceAudience; /** * wrap sasl messages. @@ -92,7 +92,7 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { if (!queue.isEmpty()) { - queue.releaseAndFailAll(new IOException("Connection closed")); + queue.releaseAndFailAll(new ConnectionClosedException("Connection closed")); } ctx.close(promise); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index f5cd0a36519..60a2349b65c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -1016,7 +1016,7 @@ public class AssignmentManager implements ServerListener { if (regionNode.isInState(State.OPENING, State.OPEN)) { if (!regionNode.getRegionLocation().equals(serverName)) { throw new UnexpectedStateException(regionNode.toString() + - "reported OPEN on server=" + serverName + + " reported OPEN on server=" + serverName + " but state has otherwise."); } else if (regionNode.isInState(State.OPENING)) { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index 40547782069..0db86766843 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -217,7 +217,7 @@ public abstract class RegionTransitionProcedure final ServerName serverName, final IOException exception) { final RegionStateNode regionNode = getRegionState(env); LOG.warn("Remote call failed {}; {}; {}; exception={}", serverName, - this, regionNode.toShortString(), exception.getClass().getSimpleName()); + this, regionNode.toShortString(), exception.getClass().getSimpleName(), exception); if (remoteCallFailed(env, regionNode, exception)) { // NOTE: This call to wakeEvent puts this Procedure back on the scheduler. // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index 3959af7aa8f..638f9d3461b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; -import java.net.SocketTimeoutException; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; @@ -26,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerListener; @@ -192,11 +192,11 @@ public class RSProcedureDispatcher return false; } - // In case socket is timed out and the region server is still online, + // In case it is a connection exception and the region server is still online, // the openRegion RPC could have been accepted by the server and - // just the response didn't go through. So we will retry to + // just the response didn't go through. So we will retry to // open the region on the same server. - final boolean retry = !hold && (e instanceof SocketTimeoutException + final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e) && master.getServerManager().isServerOnline(serverName)); if (retry) { // we want to retry as many times as needed as long as the RS is not dead. @@ -204,10 +204,9 @@ public class RSProcedureDispatcher LOG.debug(String.format("Retrying to same RegionServer %s because: %s", serverName, e.getMessage()), e); } - submitTask(this); + submitTask(this, 100, TimeUnit.MILLISECONDS); return true; } - // trying to send the request elsewhere instead LOG.warn(String.format("Failed dispatch to server=%s try=%d", serverName, numberOfAttemptsSoFar), e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterAbortAndRSGotKilled.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterAbortAndRSGotKilled.java new file mode 100644 index 00000000000..41a8001145f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterAbortAndRSGotKilled.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@Category({ MasterTests.class, MediumTests.class }) +public class TestMasterAbortAndRSGotKilled { + private static Logger LOG = LoggerFactory + .getLogger(TestMasterAbortAndRSGotKilled.class.getName()); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMasterAbortAndRSGotKilled.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("test"); + + private static CountDownLatch countDownLatch = new CountDownLatch(1); + + + + private static byte[] CF = Bytes.toBytes("cf"); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + DelayCloseCP.class.getName()); + UTIL.startMiniCluster(3); + UTIL.getAdmin().balancerSwitch(false, true); + UTIL.createTable(TABLE_NAME, CF); + UTIL.waitTableAvailable(TABLE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + JVMClusterUtil.RegionServerThread rsThread = null; + for (JVMClusterUtil.RegionServerThread t : UTIL.getMiniHBaseCluster() + .getRegionServerThreads()) { + if (!t.getRegionServer().getRegions(TABLE_NAME).isEmpty()) { + rsThread = t; + break; + } + } + //find the rs and hri of the table + HRegionServer rs = rsThread.getRegionServer(); + RegionInfo hri = rs.getRegions(TABLE_NAME).get(0).getRegionInfo(); + MoveRegionProcedure moveRegionProcedure = new MoveRegionProcedure( + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor() + .getEnvironment(), + new RegionPlan(hri, rs.getServerName(), rs.getServerName()), true); + long procID = UTIL.getMiniHBaseCluster().getMaster() + .getMasterProcedureExecutor().submitProcedure(moveRegionProcedure); + countDownLatch.await(); + UTIL.getMiniHBaseCluster().stopMaster(0); + UTIL.getMiniHBaseCluster().startMaster(); + //wait until master initialized + UTIL.waitFor(30000, + () -> UTIL.getMiniHBaseCluster().getMaster() != null && UTIL + .getMiniHBaseCluster().getMaster().isInitialized()); + Assert.assertTrue("Should be 3 RS after master restart", + UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size() == 3); + + } + + public static class DelayCloseCP implements RegionCoprocessor, + RegionObserver { + @Override + public void preClose(ObserverContext c, + boolean abortRequested) throws IOException { + try { + if (!c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) { + LOG.error("begin to sleep"); + countDownLatch.countDown(); + //Sleep here so we can stuck the RPC call + Thread.sleep(10000); + LOG.error("finish sleep"); + } + } catch (Throwable t) { + + } + } + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + } + +}