HBASE-21463 The checkOnlineRegionsReport can accidentally complete a TRSP
This commit is contained in:
parent
f770081129
commit
55fa8f4b33
|
@ -1915,6 +1915,11 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
return completed.size();
|
return completed.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public IdLock getProcExecutionLock() {
|
||||||
|
return procExecutionLock;
|
||||||
|
}
|
||||||
|
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// Worker Thread
|
// Worker Thread
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
|
|
|
@ -161,7 +161,8 @@ enum EnableTableState {
|
||||||
message EnableTableStateData {
|
message EnableTableStateData {
|
||||||
required UserInformation user_info = 1;
|
required UserInformation user_info = 1;
|
||||||
required TableName table_name = 2;
|
required TableName table_name = 2;
|
||||||
required bool skip_table_state_check = 3;
|
// not used any more, always false
|
||||||
|
required bool skip_table_state_check = 3[deprecated=true];
|
||||||
}
|
}
|
||||||
|
|
||||||
enum DisableTableState {
|
enum DisableTableState {
|
||||||
|
|
|
@ -2570,7 +2570,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
// Note: if the procedure throws exception, we will catch it and rethrow.
|
// Note: if the procedure throws exception, we will catch it and rethrow.
|
||||||
final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
|
final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
|
||||||
submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(),
|
submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(),
|
||||||
tableName, false, prepareLatch));
|
tableName, prepareLatch));
|
||||||
prepareLatch.await();
|
prepareLatch.await();
|
||||||
|
|
||||||
getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
|
getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
|
||||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.PleaseHoldException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||||
import org.apache.hadoop.hbase.YouAreDeadException;
|
|
||||||
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
|
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
@ -162,7 +161,8 @@ public class AssignmentManager implements ServerListener {
|
||||||
this(master, new RegionStateStore(master));
|
this(master, new RegionStateStore(master));
|
||||||
}
|
}
|
||||||
|
|
||||||
public AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
|
@VisibleForTesting
|
||||||
|
AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
|
||||||
this.master = master;
|
this.master = master;
|
||||||
this.regionStateStore = stateStore;
|
this.regionStateStore = stateStore;
|
||||||
this.metrics = new MetricsAssignmentManager();
|
this.metrics = new MetricsAssignmentManager();
|
||||||
|
@ -979,23 +979,26 @@ public class AssignmentManager implements ServerListener {
|
||||||
// RS Status update (report online regions) helpers
|
// RS Status update (report online regions) helpers
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
/**
|
/**
|
||||||
* the master will call this method when the RS send the regionServerReport().
|
* The master will call this method when the RS send the regionServerReport(). The report will
|
||||||
* the report will contains the "online regions".
|
* contains the "online regions". This method will check the the online regions against the
|
||||||
* this method will check the the online regions against the in-memory state of the AM,
|
* in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
|
||||||
* if there is a mismatch we will try to fence out the RS with the assumption
|
* because that there is no fencing between the reportRegionStateTransition method and
|
||||||
* that something went wrong on the RS side.
|
* regionServerReport method, so there could be race and introduce inconsistency here, but
|
||||||
|
* actually there is no problem.
|
||||||
|
* <p/>
|
||||||
|
* Please see HBASE-21421 and HBASE-21463 for more details.
|
||||||
*/
|
*/
|
||||||
public void reportOnlineRegions(final ServerName serverName, final Set<byte[]> regionNames)
|
public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
|
||||||
throws YouAreDeadException {
|
if (!isRunning()) {
|
||||||
if (!isRunning()) return;
|
return;
|
||||||
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() +
|
LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
|
||||||
", metaLoaded=" + isMetaLoaded() + " " +
|
regionNames.size(), isMetaLoaded(),
|
||||||
regionNames.stream().map(element -> Bytes.toStringBinary(element)).
|
regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
|
||||||
collect(Collectors.toList()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
||||||
|
|
||||||
synchronized (serverNode) {
|
synchronized (serverNode) {
|
||||||
if (!serverNode.isInState(ServerState.ONLINE)) {
|
if (!serverNode.isInState(ServerState.ONLINE)) {
|
||||||
|
@ -1003,104 +1006,59 @@ public class AssignmentManager implements ServerListener {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (regionNames.isEmpty()) {
|
if (regionNames.isEmpty()) {
|
||||||
// nothing to do if we don't have regions
|
// nothing to do if we don't have regions
|
||||||
LOG.trace("no online region found on " + serverName);
|
LOG.trace("no online region found on {}", serverName);
|
||||||
} else if (!isMetaLoaded()) {
|
return;
|
||||||
// if we are still on startup, discard the report unless is from someone holding meta
|
|
||||||
checkOnlineRegionsReportForMeta(serverNode, regionNames);
|
|
||||||
} else {
|
|
||||||
// The Heartbeat updates us of what regions are only. check and verify the state.
|
|
||||||
checkOnlineRegionsReport(serverNode, regionNames);
|
|
||||||
}
|
}
|
||||||
|
if (!isMetaLoaded()) {
|
||||||
|
// we are still on startup, skip checking
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// The Heartbeat tells us of what regions are on the region serve, check the state.
|
||||||
|
checkOnlineRegionsReport(serverNode, regionNames);
|
||||||
|
|
||||||
// wake report event
|
// wake report event
|
||||||
wakeServerReportEvent(serverNode);
|
wakeServerReportEvent(serverNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
void checkOnlineRegionsReportForMeta(ServerStateNode serverNode, Set<byte[]> regionNames) {
|
// just check and output possible inconsistency, without actually doing anything
|
||||||
try {
|
private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
|
||||||
for (byte[] regionName : regionNames) {
|
|
||||||
final RegionInfo hri = getMetaRegionFromName(regionName);
|
|
||||||
if (hri == null) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Skip online report for region=" + Bytes.toStringBinary(regionName) +
|
|
||||||
" while meta is loading");
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
|
|
||||||
LOG.info("META REPORTED: " + regionNode);
|
|
||||||
regionNode.lock();
|
|
||||||
try {
|
|
||||||
if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
|
|
||||||
LOG.warn("META REPORTED but no procedure found (complete?); set location=" +
|
|
||||||
serverNode.getServerName());
|
|
||||||
regionNode.setRegionLocation(serverNode.getServerName());
|
|
||||||
} else if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("META REPORTED: " + regionNode);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
regionNode.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
ServerName serverName = serverNode.getServerName();
|
ServerName serverName = serverNode.getServerName();
|
||||||
LOG.warn("KILLING " + serverName + ": " + e.getMessage());
|
|
||||||
killRegionServer(serverNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void checkOnlineRegionsReport(final ServerStateNode serverNode, final Set<byte[]> regionNames) {
|
|
||||||
final ServerName serverName = serverNode.getServerName();
|
|
||||||
try {
|
|
||||||
for (byte[] regionName : regionNames) {
|
for (byte[] regionName : regionNames) {
|
||||||
if (!isRunning()) {
|
if (!isRunning()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
|
RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
|
||||||
if (regionNode == null) {
|
if (regionNode == null) {
|
||||||
throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
|
LOG.warn("No region state node for {}, it should already be on {}",
|
||||||
|
Bytes.toStringBinary(regionName), serverName);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
regionNode.lock();
|
regionNode.lock();
|
||||||
try {
|
try {
|
||||||
|
long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
|
||||||
if (regionNode.isInState(State.OPENING, State.OPEN)) {
|
if (regionNode.isInState(State.OPENING, State.OPEN)) {
|
||||||
if (!regionNode.getRegionLocation().equals(serverName)) {
|
// This is possible as a region server has just closed a region but the region server
|
||||||
throw new UnexpectedStateException(regionNode.toString() +
|
// report is generated before the closing, but arrive after the closing. Make sure there
|
||||||
" reported OPEN on server=" + serverName +
|
// is some elapsed time so less false alarms.
|
||||||
" but state has otherwise.");
|
if (!regionNode.getRegionLocation().equals(serverName) && diff > 1000) {
|
||||||
} else if (regionNode.isInState(State.OPENING)) {
|
LOG.warn("{} reported OPEN on server={} but state has otherwise", regionNode,
|
||||||
try {
|
serverName);
|
||||||
if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
|
|
||||||
LOG.warn(regionNode.toString() + " reported OPEN on server=" + serverName +
|
|
||||||
" but state has otherwise AND NO procedure is running");
|
|
||||||
}
|
|
||||||
} catch (UnexpectedStateException e) {
|
|
||||||
LOG.warn(regionNode.toString() + " reported unexpteced OPEN: " + e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
|
} else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
|
||||||
long diff = regionNode.getLastUpdate() - EnvironmentEdgeManager.currentTime();
|
|
||||||
if (diff > 1000/*One Second... make configurable if an issue*/) {
|
|
||||||
// So, we can get report that a region is CLOSED or SPLIT because a heartbeat
|
// So, we can get report that a region is CLOSED or SPLIT because a heartbeat
|
||||||
// came in at about same time as a region transition. Make sure there is some
|
// came in at about same time as a region transition. Make sure there is some
|
||||||
// elapsed time between killing remote server.
|
// elapsed time so less false alarms.
|
||||||
throw new UnexpectedStateException(regionNode.toString() +
|
if (diff > 1000) {
|
||||||
" reported an unexpected OPEN; time since last update=" + diff);
|
LOG.warn("{} reported an unexpected OPEN on {}; time since last update={}ms",
|
||||||
|
regionNode, serverName, diff);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
regionNode.unlock();
|
regionNode.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
|
||||||
//See HBASE-21421, we can count on reportRegionStateTransition calls
|
|
||||||
//We only log a warming here. It could be a network lag.
|
|
||||||
LOG.warn("Failed to checkOnlineRegionsReport, maybe due to network lag, "
|
|
||||||
+ "if this message continues, be careful of double assign", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) {
|
protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) {
|
||||||
|
@ -1905,10 +1863,6 @@ public class AssignmentManager implements ServerListener {
|
||||||
wakeServerReportEvent(serverNode);
|
wakeServerReportEvent(serverNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void killRegionServer(final ServerStateNode serverNode) {
|
|
||||||
master.getServerManager().expireServer(serverNode.getServerName());
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
MasterServices getMaster() {
|
MasterServices getMaster() {
|
||||||
return master;
|
return master;
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -15,7 +15,6 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.master.procedure;
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -52,36 +51,30 @@ public class EnableTableProcedure
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(EnableTableProcedure.class);
|
private static final Logger LOG = LoggerFactory.getLogger(EnableTableProcedure.class);
|
||||||
|
|
||||||
private TableName tableName;
|
private TableName tableName;
|
||||||
private boolean skipTableStateCheck;
|
|
||||||
|
|
||||||
private Boolean traceEnabled = null;
|
private Boolean traceEnabled = null;
|
||||||
|
|
||||||
public EnableTableProcedure() {
|
public EnableTableProcedure() {
|
||||||
super();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param env MasterProcedureEnv
|
* @param env MasterProcedureEnv
|
||||||
* @param tableName the table to operate on
|
* @param tableName the table to operate on
|
||||||
* @param skipTableStateCheck whether to check table state
|
|
||||||
*/
|
*/
|
||||||
public EnableTableProcedure(final MasterProcedureEnv env, final TableName tableName,
|
public EnableTableProcedure(MasterProcedureEnv env, TableName tableName) {
|
||||||
final boolean skipTableStateCheck) {
|
this(env, tableName, null);
|
||||||
this(env, tableName, skipTableStateCheck, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param env MasterProcedureEnv
|
* @param env MasterProcedureEnv
|
||||||
* @param tableName the table to operate on
|
* @param tableName the table to operate on
|
||||||
* @param skipTableStateCheck whether to check table state
|
|
||||||
*/
|
*/
|
||||||
public EnableTableProcedure(final MasterProcedureEnv env, final TableName tableName,
|
public EnableTableProcedure(MasterProcedureEnv env, TableName tableName,
|
||||||
final boolean skipTableStateCheck, final ProcedurePrepareLatch syncLatch) {
|
ProcedurePrepareLatch syncLatch) {
|
||||||
super(env, syncLatch);
|
super(env, syncLatch);
|
||||||
this.tableName = tableName;
|
this.tableName = tableName;
|
||||||
this.skipTableStateCheck = skipTableStateCheck;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -268,29 +261,27 @@ public class EnableTableProcedure
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
throws IOException {
|
|
||||||
super.serializeStateData(serializer);
|
super.serializeStateData(serializer);
|
||||||
|
|
||||||
|
// the skipTableStateCheck is false so we still need to set it...
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
MasterProcedureProtos.EnableTableStateData.Builder enableTableMsg =
|
MasterProcedureProtos.EnableTableStateData.Builder enableTableMsg =
|
||||||
MasterProcedureProtos.EnableTableStateData.newBuilder()
|
MasterProcedureProtos.EnableTableStateData.newBuilder()
|
||||||
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
|
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
|
||||||
.setTableName(ProtobufUtil.toProtoTableName(tableName))
|
.setTableName(ProtobufUtil.toProtoTableName(tableName)).setSkipTableStateCheck(false);
|
||||||
.setSkipTableStateCheck(skipTableStateCheck);
|
|
||||||
|
|
||||||
serializer.serialize(enableTableMsg.build());
|
serializer.serialize(enableTableMsg.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void deserializeStateData(ProcedureStateSerializer serializer)
|
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
throws IOException {
|
|
||||||
super.deserializeStateData(serializer);
|
super.deserializeStateData(serializer);
|
||||||
|
|
||||||
MasterProcedureProtos.EnableTableStateData enableTableMsg =
|
MasterProcedureProtos.EnableTableStateData enableTableMsg =
|
||||||
serializer.deserialize(MasterProcedureProtos.EnableTableStateData.class);
|
serializer.deserialize(MasterProcedureProtos.EnableTableStateData.class);
|
||||||
setUser(MasterProcedureUtil.toUserInfo(enableTableMsg.getUserInfo()));
|
setUser(MasterProcedureUtil.toUserInfo(enableTableMsg.getUserInfo()));
|
||||||
tableName = ProtobufUtil.toTableName(enableTableMsg.getTableName());
|
tableName = ProtobufUtil.toTableName(enableTableMsg.getTableName());
|
||||||
skipTableStateCheck = enableTableMsg.getSkipTableStateCheck();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -318,7 +309,7 @@ public class EnableTableProcedure
|
||||||
if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) {
|
if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) {
|
||||||
setFailure("master-enable-table", new TableNotFoundException(tableName));
|
setFailure("master-enable-table", new TableNotFoundException(tableName));
|
||||||
canTableBeEnabled = false;
|
canTableBeEnabled = false;
|
||||||
} else if (!skipTableStateCheck) {
|
} else {
|
||||||
// There could be multiple client requests trying to disable or enable
|
// There could be multiple client requests trying to disable or enable
|
||||||
// the table at the same time. Ensure only the first request is honored
|
// the table at the same time. Ensure only the first request is honored
|
||||||
// After that, no other requests can be accepted until the table reaches
|
// After that, no other requests can be accepted until the table reaches
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableDescriptors;
|
import org.apache.hadoop.hbase.TableDescriptors;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.YouAreDeadException;
|
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||||
|
@ -78,7 +77,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mocked master services.
|
* A mocked master services.
|
||||||
* Tries to fake it. May not always work.
|
* Tries to fake it. May not always work.
|
||||||
|
@ -127,12 +125,8 @@ public class MockMasterServices extends MockNoopMasterServices {
|
||||||
protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) {
|
protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) {
|
||||||
// Make a report with current state of the server 'serverName' before we call wait..
|
// Make a report with current state of the server 'serverName' before we call wait..
|
||||||
SortedSet<byte[]> regions = regionsToRegionServers.get(serverName);
|
SortedSet<byte[]> regions = regionsToRegionServers.get(serverName);
|
||||||
try {
|
|
||||||
getAssignmentManager().reportOnlineRegions(serverName,
|
getAssignmentManager().reportOnlineRegions(serverName,
|
||||||
regions == null ? new HashSet<byte[]>() : regions);
|
regions == null ? new HashSet<byte[]>() : regions);
|
||||||
} catch (YouAreDeadException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
return super.waitServerReportEvent(serverName, proc);
|
return super.waitServerReportEvent(serverName, proc);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,188 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionState;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.IdLock;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestReportOnlineRegionsRace {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestReportOnlineRegionsRace.class);
|
||||||
|
|
||||||
|
private static volatile CountDownLatch ARRIVE_RS_REPORT;
|
||||||
|
private static volatile CountDownLatch RESUME_RS_REPORT;
|
||||||
|
private static volatile CountDownLatch FINISH_RS_REPORT;
|
||||||
|
|
||||||
|
private static volatile CountDownLatch RESUME_REPORT_STATE;
|
||||||
|
|
||||||
|
private static final class AssignmentManagerForTest extends AssignmentManager {
|
||||||
|
|
||||||
|
public AssignmentManagerForTest(MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
|
||||||
|
if (ARRIVE_RS_REPORT != null) {
|
||||||
|
ARRIVE_RS_REPORT.countDown();
|
||||||
|
try {
|
||||||
|
RESUME_RS_REPORT.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
super.reportOnlineRegions(serverName, regionNames);
|
||||||
|
if (FINISH_RS_REPORT != null) {
|
||||||
|
FINISH_RS_REPORT.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||||
|
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||||
|
if (RESUME_REPORT_STATE != null) {
|
||||||
|
try {
|
||||||
|
RESUME_REPORT_STATE.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return super.reportRegionStateTransition(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class HMasterForTest extends HMaster {
|
||||||
|
|
||||||
|
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||||
|
return new AssignmentManagerForTest(master);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName NAME = TableName.valueOf("Race");
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
|
||||||
|
UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 1000);
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
UTIL.createTable(NAME, CF);
|
||||||
|
UTIL.waitTableAvailable(NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRace() throws Exception {
|
||||||
|
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||||
|
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||||
|
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||||
|
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
|
||||||
|
|
||||||
|
// halt a regionServerReport
|
||||||
|
RESUME_RS_REPORT = new CountDownLatch(1);
|
||||||
|
ARRIVE_RS_REPORT = new CountDownLatch(1);
|
||||||
|
FINISH_RS_REPORT = new CountDownLatch(1);
|
||||||
|
|
||||||
|
ARRIVE_RS_REPORT.await();
|
||||||
|
|
||||||
|
// schedule a TRSP to REOPEN the region
|
||||||
|
RESUME_REPORT_STATE = new CountDownLatch(1);
|
||||||
|
Future<byte[]> future =
|
||||||
|
am.moveAsync(new RegionPlan(region, rsn.getRegionLocation(), rsn.getRegionLocation()));
|
||||||
|
TransitRegionStateProcedure proc =
|
||||||
|
procExec.getProcedures().stream().filter(p -> p instanceof TransitRegionStateProcedure)
|
||||||
|
.filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p).findAny().get();
|
||||||
|
IdLock procExecLock = procExec.getProcExecutionLock();
|
||||||
|
// a CloseRegionProcedure and then the OpenRegionProcedure we want to block
|
||||||
|
IdLock.Entry lockEntry = procExecLock.getLockEntry(proc.getProcId() + 2);
|
||||||
|
// resume the reportRegionStateTransition to finish the CloseRegionProcedure
|
||||||
|
RESUME_REPORT_STATE.countDown();
|
||||||
|
// wait until we schedule the OpenRegionProcedure
|
||||||
|
UTIL.waitFor(10000,
|
||||||
|
() -> proc.getCurrentStateId() == REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE);
|
||||||
|
// the region should be in OPENING state
|
||||||
|
assertEquals(RegionState.State.OPENING, rsn.getState());
|
||||||
|
// resume the region server report
|
||||||
|
RESUME_RS_REPORT.countDown();
|
||||||
|
// wait until it finishes, it will find that the region is opened on the rs
|
||||||
|
FINISH_RS_REPORT.await();
|
||||||
|
// let the OpenRegionProcedure go
|
||||||
|
procExecLock.releaseLockEntry(lockEntry);
|
||||||
|
// wait until the TRSP is done
|
||||||
|
future.get();
|
||||||
|
|
||||||
|
// confirm that the region can still be write, i.e, the regionServerReport method should not
|
||||||
|
// change the region state to OPEN
|
||||||
|
try (Table table = UTIL.getConnection().getTableBuilder(NAME, null).setWriteRpcTimeout(1000)
|
||||||
|
.setOperationTimeout(2000).build()) {
|
||||||
|
table.put(
|
||||||
|
new Put(Bytes.toBytes("key")).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes("val")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -45,7 +44,9 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
||||||
HBaseClassTestRule.forClass(TestEnableTableProcedure.class);
|
HBaseClassTestRule.forClass(TestEnableTableProcedure.class);
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestEnableTableProcedure.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestEnableTableProcedure.class);
|
||||||
@Rule public TestName name = new TestName();
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEnableTable() throws Exception {
|
public void testEnableTable() throws Exception {
|
||||||
|
@ -56,8 +57,8 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
||||||
UTIL.getAdmin().disableTable(tableName);
|
UTIL.getAdmin().disableTable(tableName);
|
||||||
|
|
||||||
// Enable the table
|
// Enable the table
|
||||||
long procId = procExec.submitProcedure(
|
long procId =
|
||||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
procExec.submitProcedure(new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||||
// Wait the completion
|
// Wait the completion
|
||||||
ProcedureTestingUtility.waitProcedure(procExec, procId);
|
ProcedureTestingUtility.waitProcedure(procExec, procId);
|
||||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
|
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
|
||||||
|
@ -72,8 +73,8 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
||||||
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f1", "f2");
|
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f1", "f2");
|
||||||
|
|
||||||
// Enable the table - expect failure
|
// Enable the table - expect failure
|
||||||
long procId1 = procExec.submitProcedure(
|
long procId1 =
|
||||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
procExec.submitProcedure(new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||||
ProcedureTestingUtility.waitProcedure(procExec, procId1);
|
ProcedureTestingUtility.waitProcedure(procExec, procId1);
|
||||||
|
|
||||||
Procedure<?> result = procExec.getResult(procId1);
|
Procedure<?> result = procExec.getResult(procId1);
|
||||||
|
@ -82,19 +83,11 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
||||||
assertTrue(
|
assertTrue(
|
||||||
ProcedureTestingUtility.getExceptionCause(result) instanceof TableNotDisabledException);
|
ProcedureTestingUtility.getExceptionCause(result) instanceof TableNotDisabledException);
|
||||||
|
|
||||||
// Enable the table with skipping table state check flag (simulate recovery scenario)
|
|
||||||
long procId2 = procExec.submitProcedure(
|
|
||||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, true));
|
|
||||||
// Wait the completion
|
|
||||||
ProcedureTestingUtility.waitProcedure(procExec, procId2);
|
|
||||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId2);
|
|
||||||
|
|
||||||
// Enable the table - expect failure from ProcedurePrepareLatch
|
// Enable the table - expect failure from ProcedurePrepareLatch
|
||||||
final ProcedurePrepareLatch prepareLatch = new ProcedurePrepareLatch.CompatibilityLatch();
|
final ProcedurePrepareLatch prepareLatch = new ProcedurePrepareLatch.CompatibilityLatch();
|
||||||
procExec.submitProcedure(
|
procExec.submitProcedure(
|
||||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false, prepareLatch));
|
new EnableTableProcedure(procExec.getEnvironment(), tableName, prepareLatch));
|
||||||
prepareLatch.await();
|
prepareLatch.await();
|
||||||
Assert.fail("Enable should throw exception through latch.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -102,9 +95,8 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
final byte[][] splitKeys = new byte[][] {
|
final byte[][] splitKeys =
|
||||||
Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
|
new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") };
|
||||||
};
|
|
||||||
MasterProcedureTestingUtility.createTable(procExec, tableName, splitKeys, "f1", "f2");
|
MasterProcedureTestingUtility.createTable(procExec, tableName, splitKeys, "f1", "f2");
|
||||||
UTIL.getAdmin().disableTable(tableName);
|
UTIL.getAdmin().disableTable(tableName);
|
||||||
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||||
|
@ -112,8 +104,8 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
||||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||||
|
|
||||||
// Start the Enable procedure && kill the executor
|
// Start the Enable procedure && kill the executor
|
||||||
long procId = procExec.submitProcedure(
|
long procId =
|
||||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
procExec.submitProcedure(new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||||
|
|
||||||
// Restart the executor and execute the step twice
|
// Restart the executor and execute the step twice
|
||||||
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
|
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
|
||||||
|
@ -126,17 +118,16 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
final byte[][] splitKeys = new byte[][] {
|
final byte[][] splitKeys =
|
||||||
Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
|
new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") };
|
||||||
};
|
|
||||||
MasterProcedureTestingUtility.createTable(procExec, tableName, splitKeys, "f1", "f2");
|
MasterProcedureTestingUtility.createTable(procExec, tableName, splitKeys, "f1", "f2");
|
||||||
UTIL.getAdmin().disableTable(tableName);
|
UTIL.getAdmin().disableTable(tableName);
|
||||||
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||||
|
|
||||||
// Start the Enable procedure && kill the executor
|
// Start the Enable procedure && kill the executor
|
||||||
long procId = procExec.submitProcedure(
|
long procId =
|
||||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
procExec.submitProcedure(new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||||
|
|
||||||
int lastStep = 3; // fail before ENABLE_TABLE_SET_ENABLING_TABLE_STATE
|
int lastStep = 3; // fail before ENABLE_TABLE_SET_ENABLING_TABLE_STATE
|
||||||
MasterProcedureTestingUtility.testRollbackAndDoubleExecution(procExec, procId, lastStep);
|
MasterProcedureTestingUtility.testRollbackAndDoubleExecution(procExec, procId, lastStep);
|
||||||
|
|
|
@ -289,7 +289,7 @@ public class TestMasterFailoverWithProcedures {
|
||||||
|
|
||||||
// Start the Delete procedure && kill the executor
|
// Start the Delete procedure && kill the executor
|
||||||
long procId = procExec.submitProcedure(
|
long procId = procExec.submitProcedure(
|
||||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||||
testRecoveryAndDoubleExecution(UTIL, procId, step);
|
testRecoveryAndDoubleExecution(UTIL, procId, step);
|
||||||
|
|
||||||
MasterProcedureTestingUtility.validateTableIsEnabled(
|
MasterProcedureTestingUtility.validateTableIsEnabled(
|
||||||
|
|
Loading…
Reference in New Issue