HBASE-21811 region can be opened on two servers due to race condition with procedures and server reports
The original fix is provided by Sergey Shelukhin, the UT is added by Duo Zhang Amending-Author: Duo Zhang <zhangduo@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
e8767ea495
commit
9c8aca73ed
|
@ -70,6 +70,22 @@ public class ProcedureEvent<T> {
|
|||
procedureScheduler.wakeEvents(new ProcedureEvent[]{this});
|
||||
}
|
||||
|
||||
/**
|
||||
* Wakes up the suspended procedures only if the given {@code proc} is waiting on this event.
|
||||
* <p/>
|
||||
* Mainly used by region assignment to reject stale OpenRegionProcedure/CloseRegionProcedure. Use
|
||||
* with caution as it will cause performance issue if there are lots of procedures waiting on the
|
||||
* event.
|
||||
*/
|
||||
public synchronized boolean wakeIfSuspended(AbstractProcedureScheduler procedureScheduler,
|
||||
Procedure<?> proc) {
|
||||
if (suspendedProcedures.stream().anyMatch(p -> p.getProcId() == proc.getProcId())) {
|
||||
wake(procedureScheduler);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wakes up all the given events and puts the procedures waiting on them back into
|
||||
* ProcedureScheduler queues.
|
||||
|
|
|
@ -86,17 +86,22 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
RegionStateNode regionNode = getRegionNode(env);
|
||||
regionNode.lock();
|
||||
try {
|
||||
ProcedureEvent<?> event = regionNode.getProcedureEvent();
|
||||
if (event.isReady()) {
|
||||
LOG.warn(
|
||||
"The procedure event of procedure {} for region {} to server {} is not suspended, " +
|
||||
"usually this should not happen, but anyway let's skip the following wake up code, ",
|
||||
this, region, targetServer);
|
||||
return;
|
||||
}
|
||||
LOG.warn("The remote operation {} for region {} to server {} failed", this, region,
|
||||
targetServer, exception);
|
||||
event.wake(env.getProcedureScheduler());
|
||||
// This could happen as the RSProcedureDispatcher and dead server processor are executed in
|
||||
// different threads. It is possible that we have already scheduled SCP for the targetServer
|
||||
// and woken up this procedure, and assigned the region to another RS, and then the
|
||||
// RSProcedureDispatcher notices that the targetServer is dead so it can not send the request
|
||||
// out and call remoteCallFailed, which makes us arrive here, especially that if the target
|
||||
// machine is completely down, which means you can only receive a ConnectionTimeout after a
|
||||
// very long time(depends on the timeout settings and in HBase usually it will be at least 15
|
||||
// seconds, or even 1 minute). So here we need to check whether we are stilling waiting on the
|
||||
// given event, if not, this means that we have already been woken up so do not wake it up
|
||||
// again.
|
||||
if (!regionNode.getProcedureEvent().wakeIfSuspended(env.getProcedureScheduler(), this)) {
|
||||
LOG.warn("{} is not waiting on the event for region {}, targer server = {}, ignore.", this,
|
||||
region, targetServer);
|
||||
}
|
||||
} finally {
|
||||
regionNode.unlock();
|
||||
}
|
||||
|
@ -176,9 +181,9 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
serializer.serialize(RegionRemoteProcedureBaseStateData.newBuilder()
|
||||
.setRegion(ProtobufUtil.toRegionInfo(region))
|
||||
.setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
|
||||
serializer.serialize(
|
||||
RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region))
|
||||
.setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -306,7 +306,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
final RpcServerInterface rpcServer;
|
||||
final InetSocketAddress isa;
|
||||
|
||||
private final HRegionServer regionServer;
|
||||
@VisibleForTesting
|
||||
protected final HRegionServer regionServer;
|
||||
private final long maxScannerResultSize;
|
||||
|
||||
// The reference to the priority extraction function
|
||||
|
@ -1602,15 +1603,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
|
||||
try {
|
||||
checkOpen();
|
||||
if (request.hasServerStartCode()) {
|
||||
// check that we are the same server that this RPC is intended for.
|
||||
long serverStartCode = request.getServerStartCode();
|
||||
if (regionServer.serverName.getStartcode() != serverStartCode) {
|
||||
throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
|
||||
"different server with startCode: " + serverStartCode + ", this server is: "
|
||||
+ regionServer.serverName));
|
||||
}
|
||||
}
|
||||
throwOnWrongStartCode(request);
|
||||
final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
|
||||
|
||||
requestCount.increment();
|
||||
|
@ -1919,6 +1912,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
}
|
||||
|
||||
private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
|
||||
if (!request.hasServerStartCode()) {
|
||||
LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
|
||||
return;
|
||||
}
|
||||
throwOnWrongStartCode(request.getServerStartCode());
|
||||
}
|
||||
|
||||
private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
|
||||
if (!request.hasServerStartCode()) {
|
||||
LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
|
||||
return;
|
||||
}
|
||||
throwOnWrongStartCode(request.getServerStartCode());
|
||||
}
|
||||
|
||||
private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
|
||||
// check that we are the same server that this RPC is intended for.
|
||||
if (regionServer.serverName.getStartcode() != serverStartCode) {
|
||||
throw new ServiceException(new DoNotRetryIOException(
|
||||
"This RPC was intended for a " + "different server with startCode: " + serverStartCode +
|
||||
", this server is: " + regionServer.serverName));
|
||||
}
|
||||
}
|
||||
|
||||
private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
|
||||
if (req.getOpenRegionCount() > 0) {
|
||||
for (OpenRegionRequest openReq : req.getOpenRegionList()) {
|
||||
throwOnWrongStartCode(openReq);
|
||||
}
|
||||
}
|
||||
if (req.getCloseRegionCount() > 0) {
|
||||
for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
|
||||
throwOnWrongStartCode(closeReq);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open asynchronously a region or a set of regions on the region server.
|
||||
*
|
||||
|
@ -1947,15 +1978,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
public OpenRegionResponse openRegion(final RpcController controller,
|
||||
final OpenRegionRequest request) throws ServiceException {
|
||||
requestCount.increment();
|
||||
if (request.hasServerStartCode()) {
|
||||
// check that we are the same server that this RPC is intended for.
|
||||
long serverStartCode = request.getServerStartCode();
|
||||
if (regionServer.serverName.getStartcode() != serverStartCode) {
|
||||
throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
|
||||
"different server with startCode: " + serverStartCode + ", this server is: "
|
||||
+ regionServer.serverName));
|
||||
}
|
||||
}
|
||||
throwOnWrongStartCode(request);
|
||||
|
||||
OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
|
||||
final int regionCount = request.getOpenInfoCount();
|
||||
|
@ -3657,6 +3680,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
ExecuteProceduresRequest request) throws ServiceException {
|
||||
try {
|
||||
checkOpen();
|
||||
throwOnWrongStartCode(request);
|
||||
regionServer.getRegionServerCoprocessorHost().preExecuteProcedures();
|
||||
if (request.getOpenRegionCount() > 0) {
|
||||
// Avoid reading from the TableDescritor every time(usually it will read from the file
|
||||
|
|
|
@ -0,0 +1,289 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
|
||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||
|
||||
/**
|
||||
* Testcase for HBASE-21811.
|
||||
*/
|
||||
@Category({ MasterTests.class, LargeTests.class })
|
||||
public class TestWakeUpUnexpectedProcedure {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestWakeUpUnexpectedProcedure.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestWakeUpUnexpectedProcedure.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName NAME = TableName.valueOf("Assign");
|
||||
|
||||
private static final List<ServerName> EXCLUDE_SERVERS = new CopyOnWriteArrayList<>();
|
||||
|
||||
private static byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
private static volatile ServerName SERVER_TO_KILL;
|
||||
|
||||
private static volatile CountDownLatch ARRIVE_EXEC_PROC;
|
||||
|
||||
private static volatile CountDownLatch RESUME_EXEC_PROC;
|
||||
|
||||
private static volatile CountDownLatch RESUME_IS_SERVER_ONLINE;
|
||||
|
||||
private static volatile CountDownLatch ARRIVE_REPORT;
|
||||
|
||||
private static volatile CountDownLatch RESUME_REPORT;
|
||||
|
||||
private static final class RSRpcServicesForTest extends RSRpcServices {
|
||||
|
||||
public RSRpcServicesForTest(HRegionServer rs) throws IOException {
|
||||
super(rs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteProceduresResponse executeProcedures(RpcController controller,
|
||||
ExecuteProceduresRequest request) throws ServiceException {
|
||||
if (request.getOpenRegionCount() > 0) {
|
||||
if (ARRIVE_EXEC_PROC != null) {
|
||||
SERVER_TO_KILL = regionServer.getServerName();
|
||||
ARRIVE_EXEC_PROC.countDown();
|
||||
ARRIVE_EXEC_PROC = null;
|
||||
try {
|
||||
RESUME_EXEC_PROC.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
throw new ServiceException(new ConnectException("Inject error"));
|
||||
}
|
||||
}
|
||||
return super.executeProcedures(controller, request);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RSForTest extends MiniHBaseClusterRegionServer {
|
||||
|
||||
public RSForTest(Configuration conf) throws IOException, InterruptedException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RSRpcServices createRpcServices() throws IOException {
|
||||
return new RSRpcServicesForTest(this);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class AMForTest extends AssignmentManager {
|
||||
|
||||
public AMForTest(MasterServices master) {
|
||||
super(master);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||
RegionStateTransition rst = req.getTransition(0);
|
||||
if (rst.getTransitionCode() == TransitionCode.OPENED &&
|
||||
ProtobufUtil.toTableName(rst.getRegionInfo(0).getTableName()).equals(NAME)) {
|
||||
CountDownLatch arrive = ARRIVE_REPORT;
|
||||
if (ARRIVE_REPORT != null) {
|
||||
ARRIVE_REPORT = null;
|
||||
arrive.countDown();
|
||||
// so we will choose another rs next time
|
||||
EXCLUDE_SERVERS.add(ProtobufUtil.toServerName(req.getServer()));
|
||||
try {
|
||||
RESUME_REPORT.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
}
|
||||
}
|
||||
return super.reportRegionStateTransition(req);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class SMForTest extends ServerManager {
|
||||
|
||||
public SMForTest(MasterServices master) {
|
||||
super(master);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isServerOnline(ServerName serverName) {
|
||||
ServerName toKill = SERVER_TO_KILL;
|
||||
if (toKill != null && toKill.equals(serverName)) {
|
||||
for (StackTraceElement ele : new Exception().getStackTrace()) {
|
||||
// halt it is called from RSProcedureDispatcher, to delay the remoteCallFailed.
|
||||
if ("scheduleForRetry".equals(ele.getMethodName())) {
|
||||
if (RESUME_IS_SERVER_ONLINE != null) {
|
||||
try {
|
||||
RESUME_IS_SERVER_ONLINE.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return super.isServerOnline(serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> createDestinationServersList() {
|
||||
return super.createDestinationServersList(EXCLUDE_SERVERS);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||
return new AMForTest(master);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ServerManager createServerManager(MasterServices master) throws IOException {
|
||||
setupClusterConnection();
|
||||
return new SMForTest(master);
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.startMiniCluster(StartMiniClusterOption.builder().numMasters(1)
|
||||
.masterClass(HMasterForTest.class).numRegionServers(3).rsClass(RSForTest.class).build());
|
||||
UTIL.createTable(NAME, CF);
|
||||
// Here the test region must not be hosted on the same rs with meta region.
|
||||
// We have 3 RSes and only two regions(meta and the test region), so they will not likely to be
|
||||
// hosted on the same RS.
|
||||
UTIL.waitTableAvailable(NAME);
|
||||
UTIL.getAdmin().balancerSwitch(false, true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
|
||||
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
|
||||
|
||||
ServerName sn = rsn.getRegionLocation();
|
||||
RESUME_EXEC_PROC = new CountDownLatch(1);
|
||||
ARRIVE_EXEC_PROC = new CountDownLatch(1);
|
||||
RESUME_IS_SERVER_ONLINE = new CountDownLatch(1);
|
||||
|
||||
// reopen the region, and halt the executeProcedures method at RS side
|
||||
am.moveAsync(new RegionPlan(region, sn, sn));
|
||||
ARRIVE_EXEC_PROC.await();
|
||||
|
||||
RESUME_REPORT = new CountDownLatch(1);
|
||||
ARRIVE_REPORT = new CountDownLatch(1);
|
||||
|
||||
// kill the region server
|
||||
ServerName serverToKill = SERVER_TO_KILL;
|
||||
UTIL.getMiniHBaseCluster().stopRegionServer(serverToKill);
|
||||
RESUME_EXEC_PROC.countDown();
|
||||
|
||||
// wait until we are going to open the region on a new rs
|
||||
ARRIVE_REPORT.await();
|
||||
|
||||
// resume the isServerOnline check, to let the rs procedure
|
||||
RESUME_IS_SERVER_ONLINE.countDown();
|
||||
|
||||
// before HBASE-20811 the state could become OPEN, and this is why later the region will be
|
||||
// assigned to two regionservers.
|
||||
for (int i = 0; i < 15; i++) {
|
||||
if (rsn.getState() == RegionState.State.OPEN) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
// resume the old report
|
||||
RESUME_REPORT.countDown();
|
||||
|
||||
// wait a bit to let the region to be online, it is not easy to write a condition for this so
|
||||
// just sleep a while.
|
||||
Thread.sleep(10000);
|
||||
|
||||
// confirm that the region is only on one rs
|
||||
int count = 0;
|
||||
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
if (!t.getRegionServer().getRegions(NAME).isEmpty()) {
|
||||
LOG.info("{} is on {}", region, t.getRegionServer().getServerName());
|
||||
count++;
|
||||
}
|
||||
}
|
||||
assertEquals(1, count);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue