HBASE-20867 RS may get killed while master restarts
This commit is contained in:
parent
0b283dae0d
commit
ac4b789f5f
|
@ -149,7 +149,7 @@ public final class ClientExceptionsUtil {
|
|||
|| e instanceof ClosedChannelException || e instanceof SyncFailedException
|
||||
|| e instanceof EOFException || e instanceof TimeoutException
|
||||
|| e instanceof CallTimeoutException || e instanceof ConnectionClosingException
|
||||
|| e instanceof FailedServerException);
|
||||
|| e instanceof FailedServerException || e instanceof ConnectionClosedException);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.exceptions;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Thrown when the connection is closed
|
||||
*/
|
||||
|
||||
@InterfaceAudience.Public
|
||||
public class ConnectionClosedException extends HBaseIOException {
|
||||
|
||||
private static final long serialVersionUID = -8938225073412971497L;
|
||||
|
||||
public ConnectionClosedException(String string) {
|
||||
super(string);
|
||||
}
|
||||
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
|
@ -176,6 +177,10 @@ class IPCUtil {
|
|||
} else if (exception instanceof DoNotRetryIOException) {
|
||||
return (IOException) new DoNotRetryIOException(
|
||||
"Call to " + addr + " failed on local exception: " + exception).initCause(exception);
|
||||
} else if (exception instanceof ConnectionClosedException) {
|
||||
return (ConnectionClosedException) exception;
|
||||
} else if (exception instanceof StoppedRpcClientException) {
|
||||
return (StoppedRpcClientException) exception;
|
||||
} else {
|
||||
return (IOException) new IOException(
|
||||
"Call to " + addr + " failed on local exception: " + exception).initCause(exception);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||
|
@ -207,7 +208,7 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
|
|||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!id2Call.isEmpty()) {
|
||||
cleanupCalls(ctx, new IOException("Connection closed"));
|
||||
cleanupCalls(ctx, new ConnectionClosedException("Connection closed"));
|
||||
}
|
||||
conn.shutdown();
|
||||
ctx.fireChannelInactive();
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||
|
@ -29,7 +30,6 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* wrap messages with Crypto AES.
|
||||
|
@ -91,7 +91,7 @@ public class CryptoAESWrapHandler extends ChannelOutboundHandlerAdapter {
|
|||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
if (!queue.isEmpty()) {
|
||||
queue.releaseAndFailAll(new IOException("Connection closed"));
|
||||
queue.releaseAndFailAll(new ConnectionClosedException("Connection closed"));
|
||||
}
|
||||
ctx.close(promise);
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
@ -150,7 +151,7 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
|
|||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
saslRpcClient.dispose();
|
||||
saslPromise.tryFailure(new IOException("Connection closed"));
|
||||
saslPromise.tryFailure(new ConnectionClosedException("Connection closed"));
|
||||
ctx.fireChannelInactive();
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import javax.security.sasl.SaslClient;
|
||||
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||
|
@ -26,11 +31,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.CoalescingBufferQueue;
|
|||
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.security.sasl.SaslClient;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* wrap sasl messages.
|
||||
|
@ -92,7 +92,7 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
|
|||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
if (!queue.isEmpty()) {
|
||||
queue.releaseAndFailAll(new IOException("Connection closed"));
|
||||
queue.releaseAndFailAll(new ConnectionClosedException("Connection closed"));
|
||||
}
|
||||
ctx.close(promise);
|
||||
}
|
||||
|
|
|
@ -217,7 +217,7 @@ public abstract class RegionTransitionProcedure
|
|||
final ServerName serverName, final IOException exception) {
|
||||
final RegionStateNode regionNode = getRegionState(env);
|
||||
LOG.warn("Remote call failed {}; {}; {}; exception={}", serverName,
|
||||
this, regionNode.toShortString(), exception.getClass().getSimpleName());
|
||||
this, regionNode.toShortString(), exception.getClass().getSimpleName(), exception);
|
||||
if (remoteCallFailed(env, regionNode, exception)) {
|
||||
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
|
||||
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -26,6 +25,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.ServerListener;
|
||||
|
@ -192,11 +192,11 @@ public class RSProcedureDispatcher
|
|||
return false;
|
||||
}
|
||||
|
||||
// In case socket is timed out and the region server is still online,
|
||||
// In case it is a connection exception and the region server is still online,
|
||||
// the openRegion RPC could have been accepted by the server and
|
||||
// just the response didn't go through. So we will retry to
|
||||
// open the region on the same server.
|
||||
final boolean retry = !hold && (e instanceof SocketTimeoutException
|
||||
final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e)
|
||||
&& master.getServerManager().isServerOnline(serverName));
|
||||
if (retry) {
|
||||
// we want to retry as many times as needed as long as the RS is not dead.
|
||||
|
@ -204,10 +204,9 @@ public class RSProcedureDispatcher
|
|||
LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
|
||||
serverName, e.getMessage()), e);
|
||||
}
|
||||
submitTask(this);
|
||||
submitTask(this, 100, TimeUnit.MILLISECONDS);
|
||||
return true;
|
||||
}
|
||||
|
||||
// trying to send the request elsewhere instead
|
||||
LOG.warn(String.format("Failed dispatch to server=%s try=%d",
|
||||
serverName, numberOfAttemptsSoFar), e);
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestMasterAbortAndRSGotKilled {
|
||||
private static Logger LOG = LoggerFactory
|
||||
.getLogger(TestMasterAbortAndRSGotKilled.class.getName());
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMasterAbortAndRSGotKilled.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("test");
|
||||
|
||||
private static CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
|
||||
|
||||
private static byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
DelayCloseCP.class.getName());
|
||||
UTIL.startMiniCluster(3);
|
||||
UTIL.getAdmin().balancerSwitch(false, true);
|
||||
UTIL.createTable(TABLE_NAME, CF);
|
||||
UTIL.waitTableAvailable(TABLE_NAME);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
JVMClusterUtil.RegionServerThread rsThread = null;
|
||||
for (JVMClusterUtil.RegionServerThread t : UTIL.getMiniHBaseCluster()
|
||||
.getRegionServerThreads()) {
|
||||
if (!t.getRegionServer().getRegions(TABLE_NAME).isEmpty()) {
|
||||
rsThread = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
//find the rs and hri of the table
|
||||
HRegionServer rs = rsThread.getRegionServer();
|
||||
RegionInfo hri = rs.getRegions(TABLE_NAME).get(0).getRegionInfo();
|
||||
MoveRegionProcedure moveRegionProcedure = new MoveRegionProcedure(
|
||||
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor()
|
||||
.getEnvironment(),
|
||||
new RegionPlan(hri, rs.getServerName(), rs.getServerName()), true);
|
||||
long procID = UTIL.getMiniHBaseCluster().getMaster()
|
||||
.getMasterProcedureExecutor().submitProcedure(moveRegionProcedure);
|
||||
countDownLatch.await();
|
||||
UTIL.getMiniHBaseCluster().stopMaster(0);
|
||||
UTIL.getMiniHBaseCluster().startMaster();
|
||||
//wait until master initialized
|
||||
UTIL.waitFor(30000,
|
||||
() -> UTIL.getMiniHBaseCluster().getMaster() != null && UTIL
|
||||
.getMiniHBaseCluster().getMaster().isInitialized());
|
||||
Assert.assertTrue("Should be 3 RS after master restart",
|
||||
UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size() == 3);
|
||||
|
||||
}
|
||||
|
||||
public static class DelayCloseCP implements RegionCoprocessor,
|
||||
RegionObserver {
|
||||
@Override
|
||||
public void preClose(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
boolean abortRequested) throws IOException {
|
||||
try {
|
||||
if (!c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
|
||||
LOG.error("begin to sleep");
|
||||
countDownLatch.countDown();
|
||||
//Sleep here so we can stuck the RPC call
|
||||
Thread.sleep(10000);
|
||||
LOG.error("finish sleep");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue