HBASE-21811 region can be opened on two servers due to race condition with procedures and server reports

The original fix is provided by Sergey Shelukhin, the UT is added by Duo Zhang

Amending-Author: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Sergey Shelukhin 2019-02-02 11:00:53 +08:00 committed by Duo Zhang
parent 5784a09fff
commit 946bc19242
4 changed files with 365 additions and 31 deletions

View File

@ -70,6 +70,22 @@ public class ProcedureEvent<T> {
procedureScheduler.wakeEvents(new ProcedureEvent[]{this});
}
/**
* Wakes up the suspended procedures only if the given {@code proc} is waiting on this event.
* <p/>
* Mainly used by region assignment to reject stale OpenRegionProcedure/CloseRegionProcedure. Use
* with caution as it will cause performance issue if there are lots of procedures waiting on the
* event.
*/
public synchronized boolean wakeIfSuspended(AbstractProcedureScheduler procedureScheduler,
Procedure<?> proc) {
if (suspendedProcedures.stream().anyMatch(p -> p.getProcId() == proc.getProcId())) {
wake(procedureScheduler);
return true;
}
return false;
}
/**
* Wakes up all the given events and puts the procedures waiting on them back into
* ProcedureScheduler queues.

View File

@ -86,17 +86,22 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
RegionStateNode regionNode = getRegionNode(env);
regionNode.lock();
try {
ProcedureEvent<?> event = regionNode.getProcedureEvent();
if (event.isReady()) {
LOG.warn(
"The procedure event of procedure {} for region {} to server {} is not suspended, " +
"usually this should not happen, but anyway let's skip the following wake up code, ",
this, region, targetServer);
return;
}
LOG.warn("The remote operation {} for region {} to server {} failed", this, region,
targetServer, exception);
event.wake(env.getProcedureScheduler());
// This could happen as the RSProcedureDispatcher and dead server processor are executed in
// different threads. It is possible that we have already scheduled SCP for the targetServer
// and woken up this procedure, and assigned the region to another RS, and then the
// RSProcedureDispatcher notices that the targetServer is dead so it can not send the request
// out and call remoteCallFailed, which makes us arrive here, especially that if the target
// machine is completely down, which means you can only receive a ConnectionTimeout after a
// very long time(depends on the timeout settings and in HBase usually it will be at least 15
// seconds, or even 1 minute). So here we need to check whether we are stilling waiting on the
// given event, if not, this means that we have already been woken up so do not wake it up
// again.
if (!regionNode.getProcedureEvent().wakeIfSuspended(env.getProcedureScheduler(), this)) {
LOG.warn("{} is not waiting on the event for region {}, targer server = {}, ignore.", this,
region, targetServer);
}
} finally {
regionNode.unlock();
}
@ -176,8 +181,8 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
serializer.serialize(RegionRemoteProcedureBaseStateData.newBuilder()
.setRegion(ProtobufUtil.toRegionInfo(region))
serializer.serialize(
RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region))
.setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
}

View File

@ -309,7 +309,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final RpcServerInterface rpcServer;
final InetSocketAddress isa;
private final HRegionServer regionServer;
@VisibleForTesting
protected final HRegionServer regionServer;
private final long maxScannerResultSize;
// The reference to the priority extraction function
@ -1605,15 +1606,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
checkOpen();
if (request.hasServerStartCode()) {
// check that we are the same server that this RPC is intended for.
long serverStartCode = request.getServerStartCode();
if (regionServer.serverName.getStartcode() != serverStartCode) {
throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
"different server with startCode: " + serverStartCode + ", this server is: "
+ regionServer.serverName));
}
}
throwOnWrongStartCode(request);
final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
requestCount.increment();
@ -1922,6 +1915,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
if (!request.hasServerStartCode()) {
LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
return;
}
throwOnWrongStartCode(request.getServerStartCode());
}
private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
if (!request.hasServerStartCode()) {
LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
return;
}
throwOnWrongStartCode(request.getServerStartCode());
}
private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
// check that we are the same server that this RPC is intended for.
if (regionServer.serverName.getStartcode() != serverStartCode) {
throw new ServiceException(new DoNotRetryIOException(
"This RPC was intended for a " + "different server with startCode: " + serverStartCode +
", this server is: " + regionServer.serverName));
}
}
private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
if (req.getOpenRegionCount() > 0) {
for (OpenRegionRequest openReq : req.getOpenRegionList()) {
throwOnWrongStartCode(openReq);
}
}
if (req.getCloseRegionCount() > 0) {
for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
throwOnWrongStartCode(closeReq);
}
}
}
/**
* Open asynchronously a region or a set of regions on the region server.
*
@ -1950,15 +1981,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
public OpenRegionResponse openRegion(final RpcController controller,
final OpenRegionRequest request) throws ServiceException {
requestCount.increment();
if (request.hasServerStartCode()) {
// check that we are the same server that this RPC is intended for.
long serverStartCode = request.getServerStartCode();
if (regionServer.serverName.getStartcode() != serverStartCode) {
throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
"different server with startCode: " + serverStartCode + ", this server is: "
+ regionServer.serverName));
}
}
throwOnWrongStartCode(request);
OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
final int regionCount = request.getOpenInfoCount();
@ -3751,6 +3774,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ExecuteProceduresRequest request) throws ServiceException {
try {
checkOpen();
throwOnWrongStartCode(request);
regionServer.getRegionServerCoprocessorHost().preExecuteProcedures();
if (request.getOpenRegionCount() > 0) {
// Avoid reading from the TableDescritor every time(usually it will read from the file

View File

@ -0,0 +1,289 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.ConnectException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
/**
* Testcase for HBASE-21811.
*/
@Category({ MasterTests.class, LargeTests.class })
public class TestWakeUpUnexpectedProcedure {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWakeUpUnexpectedProcedure.class);
private static final Logger LOG = LoggerFactory.getLogger(TestWakeUpUnexpectedProcedure.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName NAME = TableName.valueOf("Assign");
private static final List<ServerName> EXCLUDE_SERVERS = new CopyOnWriteArrayList<>();
private static byte[] CF = Bytes.toBytes("cf");
private static volatile ServerName SERVER_TO_KILL;
private static volatile CountDownLatch ARRIVE_EXEC_PROC;
private static volatile CountDownLatch RESUME_EXEC_PROC;
private static volatile CountDownLatch RESUME_IS_SERVER_ONLINE;
private static volatile CountDownLatch ARRIVE_REPORT;
private static volatile CountDownLatch RESUME_REPORT;
private static final class RSRpcServicesForTest extends RSRpcServices {
public RSRpcServicesForTest(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public ExecuteProceduresResponse executeProcedures(RpcController controller,
ExecuteProceduresRequest request) throws ServiceException {
if (request.getOpenRegionCount() > 0) {
if (ARRIVE_EXEC_PROC != null) {
SERVER_TO_KILL = regionServer.getServerName();
ARRIVE_EXEC_PROC.countDown();
ARRIVE_EXEC_PROC = null;
try {
RESUME_EXEC_PROC.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new ServiceException(new ConnectException("Inject error"));
}
}
return super.executeProcedures(controller, request);
}
}
public static final class RSForTest extends MiniHBaseClusterRegionServer {
public RSForTest(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new RSRpcServicesForTest(this);
}
}
private static final class AMForTest extends AssignmentManager {
public AMForTest(MasterServices master) {
super(master);
}
@Override
public ReportRegionStateTransitionResponse reportRegionStateTransition(
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
RegionStateTransition rst = req.getTransition(0);
if (rst.getTransitionCode() == TransitionCode.OPENED &&
ProtobufUtil.toTableName(rst.getRegionInfo(0).getTableName()).equals(NAME)) {
CountDownLatch arrive = ARRIVE_REPORT;
if (ARRIVE_REPORT != null) {
ARRIVE_REPORT = null;
arrive.countDown();
// so we will choose another rs next time
EXCLUDE_SERVERS.add(ProtobufUtil.toServerName(req.getServer()));
try {
RESUME_REPORT.await();
} catch (InterruptedException e) {
throw new RuntimeException();
}
}
}
return super.reportRegionStateTransition(req);
}
}
private static final class SMForTest extends ServerManager {
public SMForTest(MasterServices master) {
super(master);
}
@Override
public boolean isServerOnline(ServerName serverName) {
ServerName toKill = SERVER_TO_KILL;
if (toKill != null && toKill.equals(serverName)) {
for (StackTraceElement ele : new Exception().getStackTrace()) {
// halt it is called from RSProcedureDispatcher, to delay the remoteCallFailed.
if ("scheduleForRetry".equals(ele.getMethodName())) {
if (RESUME_IS_SERVER_ONLINE != null) {
try {
RESUME_IS_SERVER_ONLINE.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
break;
}
}
}
return super.isServerOnline(serverName);
}
@Override
public List<ServerName> createDestinationServersList() {
return super.createDestinationServersList(EXCLUDE_SERVERS);
}
}
public static final class HMasterForTest extends HMaster {
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
super(conf);
}
@Override
protected AssignmentManager createAssignmentManager(MasterServices master) {
return new AMForTest(master);
}
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
setupClusterConnection();
return new SMForTest(master);
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(StartMiniClusterOption.builder().numMasters(1)
.masterClass(HMasterForTest.class).numRegionServers(3).rsClass(RSForTest.class).build());
UTIL.createTable(NAME, CF);
// Here the test region must not be hosted on the same rs with meta region.
// We have 3 RSes and only two regions(meta and the test region), so they will not likely to be
// hosted on the same RS.
UTIL.waitTableAvailable(NAME);
UTIL.getAdmin().balancerSwitch(false, true);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws Exception {
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
ServerName sn = rsn.getRegionLocation();
RESUME_EXEC_PROC = new CountDownLatch(1);
ARRIVE_EXEC_PROC = new CountDownLatch(1);
RESUME_IS_SERVER_ONLINE = new CountDownLatch(1);
// reopen the region, and halt the executeProcedures method at RS side
am.moveAsync(new RegionPlan(region, sn, sn));
ARRIVE_EXEC_PROC.await();
RESUME_REPORT = new CountDownLatch(1);
ARRIVE_REPORT = new CountDownLatch(1);
// kill the region server
ServerName serverToKill = SERVER_TO_KILL;
UTIL.getMiniHBaseCluster().stopRegionServer(serverToKill);
RESUME_EXEC_PROC.countDown();
// wait until we are going to open the region on a new rs
ARRIVE_REPORT.await();
// resume the isServerOnline check, to let the rs procedure
RESUME_IS_SERVER_ONLINE.countDown();
// before HBASE-20811 the state could become OPEN, and this is why later the region will be
// assigned to two regionservers.
for (int i = 0; i < 15; i++) {
if (rsn.getState() == RegionState.State.OPEN) {
break;
}
Thread.sleep(1000);
}
// resume the old report
RESUME_REPORT.countDown();
// wait a bit to let the region to be online, it is not easy to write a condition for this so
// just sleep a while.
Thread.sleep(10000);
// confirm that the region is only on one rs
int count = 0;
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
if (!t.getRegionServer().getRegions(NAME).isEmpty()) {
LOG.info("{} is on {}", region, t.getRegionServer().getServerName());
count++;
}
}
assertEquals(1, count);
}
}