HBASE-21472 Should not persist the dispatched field for RegionRemoteProcedureBase
This commit is contained in:
parent
a8b642067d
commit
9e42a9e310
|
@ -541,7 +541,6 @@ message RegionStateTransitionStateData {
|
||||||
message RegionRemoteProcedureBaseStateData {
|
message RegionRemoteProcedureBaseStateData {
|
||||||
required RegionInfo region = 1;
|
required RegionInfo region = 1;
|
||||||
required ServerName target_server = 2;
|
required ServerName target_server = 2;
|
||||||
required bool dispatched = 3;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message OpenRegionProcedureStateData {
|
message OpenRegionProcedureStateData {
|
||||||
|
|
|
@ -83,6 +83,6 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean shouldDispatch(RegionStateNode regionNode) {
|
protected boolean shouldDispatch(RegionStateNode regionNode) {
|
||||||
return !regionNode.isInState(RegionState.State.CLOSED, RegionState.State.ABNORMALLY_CLOSED);
|
return regionNode.isInState(RegionState.State.CLOSING);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,6 +68,6 @@ public class OpenRegionProcedure extends RegionRemoteProcedureBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean shouldDispatch(RegionStateNode regionNode) {
|
protected boolean shouldDispatch(RegionStateNode regionNode) {
|
||||||
return !regionNode.isInState(RegionState.State.OPEN);
|
return regionNode.isInState(RegionState.State.OPENING);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,6 +107,17 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
return region.getTable();
|
return region.getTable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean waitInitialized(MasterProcedureEnv env) {
|
||||||
|
if (TableName.isMetaTableName(getTableName())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// First we need meta to be loaded, and second, if meta is not online then we will likely to
|
||||||
|
// fail when updating meta so we wait until it is assigned.
|
||||||
|
AssignmentManager am = env.getAssignmentManager();
|
||||||
|
return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, region);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
@ -120,11 +131,13 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
/**
|
/**
|
||||||
* Check whether we still need to make the call to RS.
|
* Check whether we still need to make the call to RS.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Usually this will not happen if we do not allow assigning a already onlined region. But if we
|
* This could happen when master restarts. Since we do not know whether a request has already been
|
||||||
* have something wrong in the RSProcedureDispatcher, where we have already sent the request to
|
* sent to the region server after we add a remote operation to the dispatcher, so the safe way is
|
||||||
* RS, but then we tell the upper layer the remote call is failed due to rpc timeout or connection
|
* to not persist the dispatched field and try to add the remote operation again. But it is
|
||||||
* closed or anything else, then this issue can still happen. So here we add a check to make it
|
* possible that we do have already sent the request to region server and it has also sent back
|
||||||
* more robust.
|
* the response, so here we need to check the region state, if it is not in the expecting state,
|
||||||
|
* we should give up, otherwise we may hang for ever, as the region server will just ignore
|
||||||
|
* redundant calls.
|
||||||
*/
|
*/
|
||||||
protected abstract boolean shouldDispatch(RegionStateNode regionNode);
|
protected abstract boolean shouldDispatch(RegionStateNode regionNode);
|
||||||
|
|
||||||
|
@ -165,7 +178,7 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
serializer.serialize(RegionRemoteProcedureBaseStateData.newBuilder()
|
serializer.serialize(RegionRemoteProcedureBaseStateData.newBuilder()
|
||||||
.setRegion(ProtobufUtil.toRegionInfo(region))
|
.setRegion(ProtobufUtil.toRegionInfo(region))
|
||||||
.setTargetServer(ProtobufUtil.toServerName(targetServer)).setDispatched(dispatched).build());
|
.setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -174,6 +187,5 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
serializer.deserialize(RegionRemoteProcedureBaseStateData.class);
|
serializer.deserialize(RegionRemoteProcedureBaseStateData.class);
|
||||||
region = ProtobufUtil.toRegionInfo(data.getRegion());
|
region = ProtobufUtil.toRegionInfo(data.getRegion());
|
||||||
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
|
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
|
||||||
dispatched = data.getDispatched();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,180 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestRegionAssignedToMultipleRegionServers {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestRegionAssignedToMultipleRegionServers.class);
|
||||||
|
|
||||||
|
private static final List<ServerName> EXCLUDE_SERVERS = new ArrayList<>();
|
||||||
|
|
||||||
|
private static boolean HALT = false;
|
||||||
|
|
||||||
|
private static boolean KILL = false;
|
||||||
|
|
||||||
|
private static CountDownLatch ARRIVE;
|
||||||
|
|
||||||
|
private static final class ServerManagerForTest extends ServerManager {
|
||||||
|
|
||||||
|
public ServerManagerForTest(MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ServerName> createDestinationServersList() {
|
||||||
|
return super.createDestinationServersList(EXCLUDE_SERVERS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class AssignmentManagerForTest extends AssignmentManager {
|
||||||
|
|
||||||
|
public AssignmentManagerForTest(MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||||
|
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||||
|
if (req.getTransition(0).getTransitionCode() == TransitionCode.OPENED) {
|
||||||
|
if (ARRIVE != null) {
|
||||||
|
ARRIVE.countDown();
|
||||||
|
ARRIVE = null;
|
||||||
|
}
|
||||||
|
while (HALT) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
if (KILL) {
|
||||||
|
throw new PleaseHoldException("Inject error!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return super.reportRegionStateTransition(req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class HMasterForTest extends HMaster {
|
||||||
|
|
||||||
|
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||||
|
return new AssignmentManagerForTest(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServerManager createServerManager(MasterServices master) throws IOException {
|
||||||
|
setupClusterConnection();
|
||||||
|
return new ServerManagerForTest(master);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName NAME = TableName.valueOf("Assign");
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
|
||||||
|
UTIL
|
||||||
|
.startMiniCluster(StartMiniClusterOption.builder().numMasters(2).numRegionServers(2).build());
|
||||||
|
UTIL.createTable(NAME, CF);
|
||||||
|
UTIL.waitTableAvailable(NAME);
|
||||||
|
UTIL.getAdmin().balancerSwitch(false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
|
||||||
|
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||||
|
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
|
||||||
|
|
||||||
|
ServerName sn = rsn.getRegionLocation();
|
||||||
|
ARRIVE = new CountDownLatch(1);
|
||||||
|
HALT = true;
|
||||||
|
am.moveAsync(new RegionPlan(region, sn, sn));
|
||||||
|
ARRIVE.await();
|
||||||
|
|
||||||
|
// let's restart the master
|
||||||
|
EXCLUDE_SERVERS.add(rsn.getRegionLocation());
|
||||||
|
KILL = true;
|
||||||
|
HMaster activeMaster = UTIL.getMiniHBaseCluster().getMaster();
|
||||||
|
activeMaster.abort("For testing");
|
||||||
|
activeMaster.getThread().join();
|
||||||
|
KILL = false;
|
||||||
|
|
||||||
|
// sleep a while to reproduce the problem, as after the fix in HBASE-21472 the execution logic
|
||||||
|
// is changed so the old code to reproduce the problem can not compile...
|
||||||
|
Thread.sleep(10000);
|
||||||
|
HALT = false;
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
HRegionServer rs = UTIL.getMiniHBaseCluster().getRegionServer(sn);
|
||||||
|
assertNotNull(rs.getRegion(region.getEncodedName()));
|
||||||
|
assertNull(UTIL.getOtherRegionServer(rs).getRegion(region.getEncodedName()));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue