HBASE-19367 Refactoring in RegionStates, and RSProcedureDispatcher
- Adding javadoc comments - Bug: ServerStateNode#regions is HashSet but there's no synchronization to prevent concurrent addRegion/removeRegion. Let's use concurrent set instead. - Use getRegionsInTransitionCount() directly to avoid instead of getRegionsInTransition().size() because the latter copies everything into a new array - what a waste for just the size. - There's mixed use of getRegionNode and getRegionStateNode for same return type - RegionStateNode. Changing everything to getRegionStateNode. Similarly rename other *RegionNode() fns to *RegionStateNode(). - RegionStateNode#transitionState() return value is useless since it always returns it's first param. - Other minor improvements
This commit is contained in:
parent
9419709995
commit
4f4aac77e1
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
@ -169,12 +170,12 @@ public class RegionState {
|
||||||
// The duration of region in transition
|
// The duration of region in transition
|
||||||
private long ritDuration;
|
private long ritDuration;
|
||||||
|
|
||||||
public RegionState(RegionInfo region, State state) {
|
@VisibleForTesting
|
||||||
this(region, state, System.currentTimeMillis(), null);
|
public static RegionState createForTesting(RegionInfo region, State state) {
|
||||||
|
return new RegionState(region, state, System.currentTimeMillis(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionState(RegionInfo region,
|
public RegionState(RegionInfo region, State state, ServerName serverName) {
|
||||||
State state, ServerName serverName) {
|
|
||||||
this(region, state, System.currentTimeMillis(), serverName);
|
this(region, state, System.currentTimeMillis(), serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,9 +21,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoDisplay;
|
|
||||||
import org.apache.hadoop.hbase.master.RegionState;
|
import org.apache.hadoop.hbase.master.RegionState;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
@ -66,7 +63,7 @@ public class TestRegionInfoDisplay {
|
||||||
Assert.assertArrayEquals(RegionInfoDisplay.HIDDEN_START_KEY,
|
Assert.assertArrayEquals(RegionInfoDisplay.HIDDEN_START_KEY,
|
||||||
RegionInfoDisplay.getStartKeyForDisplay(ri, conf));
|
RegionInfoDisplay.getStartKeyForDisplay(ri, conf));
|
||||||
|
|
||||||
RegionState state = new RegionState(convert(ri), RegionState.State.OPEN);
|
RegionState state = RegionState.createForTesting(convert(ri), RegionState.State.OPEN);
|
||||||
String descriptiveNameForDisplay =
|
String descriptiveNameForDisplay =
|
||||||
RegionInfoDisplay.getDescriptiveNameFromRegionStateForDisplay(state, conf);
|
RegionInfoDisplay.getDescriptiveNameFromRegionStateForDisplay(state, conf);
|
||||||
checkDescriptiveNameEquality(descriptiveNameForDisplay,state.toDescriptiveString(), startKey);
|
checkDescriptiveNameEquality(descriptiveNameForDisplay,state.toDescriptiveString(), startKey);
|
||||||
|
|
|
@ -242,9 +242,9 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
|
protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
|
||||||
final TRemote remote, final Set<RemoteProcedure> operations) {
|
final TRemote remote, final Set<RemoteProcedure> remoteProcedures) {
|
||||||
final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
|
final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
|
||||||
for (RemoteProcedure proc: operations) {
|
for (RemoteProcedure proc: remoteProcedures) {
|
||||||
RemoteOperation operation = proc.remoteCallBuild(env, remote);
|
RemoteOperation operation = proc.remoteCallBuild(env, remote);
|
||||||
requestByType.put(operation.getClass(), operation);
|
requestByType.put(operation.getClass(), operation);
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,7 +130,7 @@ public class TestRSGroupsOfflineMode {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
return groupRS.getNumberOfOnlineRegions() < 1 &&
|
return groupRS.getNumberOfOnlineRegions() < 1 &&
|
||||||
master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() < 1;
|
master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount() < 1;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// Move table to group and wait.
|
// Move table to group and wait.
|
||||||
|
|
|
@ -1337,7 +1337,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
// Throttling by max number regions in transition
|
// Throttling by max number regions in transition
|
||||||
while (!interrupted
|
while (!interrupted
|
||||||
&& maxRegionsInTransition > 0
|
&& maxRegionsInTransition > 0
|
||||||
&& this.assignmentManager.getRegionStates().getRegionsInTransition().size()
|
&& this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
|
||||||
>= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
|
>= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
|
||||||
try {
|
try {
|
||||||
// sleep if the number of regions in transition exceeds the limit
|
// sleep if the number of regions in transition exceeds the limit
|
||||||
|
|
|
@ -352,7 +352,7 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
||||||
@Override
|
@Override
|
||||||
public ServerName getServer(final MasterProcedureEnv env) {
|
public ServerName getServer(final MasterProcedureEnv env) {
|
||||||
RegionStateNode node =
|
RegionStateNode node =
|
||||||
env.getAssignmentManager().getRegionStates().getRegionNode(this.getRegionInfo());
|
env.getAssignmentManager().getRegionStates().getRegionStateNode(this.getRegionInfo());
|
||||||
if (node == null) return null;
|
if (node == null) return null;
|
||||||
return node.getRegionLocation();
|
return node.getRegionLocation();
|
||||||
}
|
}
|
||||||
|
|
|
@ -377,7 +377,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
|
|
||||||
private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
|
private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
|
||||||
// TODO: check for state?
|
// TODO: check for state?
|
||||||
final RegionStateNode node = regionStates.getRegionNode(regionInfo);
|
final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
|
||||||
return(node != null && serverName.equals(node.getRegionLocation()));
|
return(node != null && serverName.equals(node.getRegionLocation()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -538,7 +538,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan)
|
public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// TODO: rename this reassign
|
// TODO: rename this reassign
|
||||||
RegionStateNode node = this.regionStates.getRegionNode(regionInfo);
|
RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
|
||||||
ServerName destinationServer = node.getRegionLocation();
|
ServerName destinationServer = node.getRegionLocation();
|
||||||
if (destinationServer == null) {
|
if (destinationServer == null) {
|
||||||
throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString());
|
throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString());
|
||||||
|
@ -549,7 +549,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void move(final RegionInfo regionInfo) throws IOException {
|
public void move(final RegionInfo regionInfo) throws IOException {
|
||||||
RegionStateNode node = this.regionStates.getRegionNode(regionInfo);
|
RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
|
||||||
ServerName sourceServer = node.getRegionLocation();
|
ServerName sourceServer = node.getRegionLocation();
|
||||||
RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null);
|
RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null);
|
||||||
MoveRegionProcedure proc = createMoveRegionProcedure(plan);
|
MoveRegionProcedure proc = createMoveRegionProcedure(plan);
|
||||||
|
@ -576,7 +576,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
// Something badly wrong if takes ten seconds to register a region.
|
// Something badly wrong if takes ten seconds to register a region.
|
||||||
long endTime = startTime + 10000;
|
long endTime = startTime + 10000;
|
||||||
while ((node = regionStates.getRegionNode(regionInfo)) == null && isRunning() &&
|
while ((node = regionStates.getRegionStateNode(regionInfo)) == null && isRunning() &&
|
||||||
System.currentTimeMillis() < endTime) {
|
System.currentTimeMillis() < endTime) {
|
||||||
// Presume it not yet added but will be added soon. Let it spew a lot so we can tell if
|
// Presume it not yet added but will be added soon. Let it spew a lot so we can tell if
|
||||||
// we are waiting here alot.
|
// we are waiting here alot.
|
||||||
|
@ -796,7 +796,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
throws PleaseHoldException, UnexpectedStateException {
|
throws PleaseHoldException, UnexpectedStateException {
|
||||||
checkFailoverCleanupCompleted(regionInfo);
|
checkFailoverCleanupCompleted(regionInfo);
|
||||||
|
|
||||||
final RegionStateNode regionNode = regionStates.getRegionNode(regionInfo);
|
final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
|
||||||
if (regionNode == null) {
|
if (regionNode == null) {
|
||||||
// the table/region is gone. maybe a delete, split, merge
|
// the table/region is gone. maybe a delete, split, merge
|
||||||
throw new UnexpectedStateException(String.format(
|
throw new UnexpectedStateException(String.format(
|
||||||
|
@ -947,7 +947,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
final RegionStateNode regionNode = regionStates.getOrCreateRegionNode(hri);
|
final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
|
||||||
LOG.info("META REPORTED: " + regionNode);
|
LOG.info("META REPORTED: " + regionNode);
|
||||||
if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
|
if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
|
||||||
LOG.warn("META REPORTED but no procedure found (complete?)");
|
LOG.warn("META REPORTED but no procedure found (complete?)");
|
||||||
|
@ -969,7 +969,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
try {
|
try {
|
||||||
for (byte[] regionName: regionNames) {
|
for (byte[] regionName: regionNames) {
|
||||||
if (!isRunning()) return;
|
if (!isRunning()) return;
|
||||||
final RegionStateNode regionNode = regionStates.getRegionNodeFromName(regionName);
|
final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
|
||||||
if (regionNode == null) {
|
if (regionNode == null) {
|
||||||
throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
|
throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
|
||||||
}
|
}
|
||||||
|
@ -1142,7 +1142,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
|
private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
|
||||||
final RegionStateNode regionNode = regionStates.getRegionNode(regionInfo);
|
final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
|
||||||
//if (regionNode.isStuck()) {
|
//if (regionNode.isStuck()) {
|
||||||
LOG.warn("TODO Handle stuck in transition: " + regionNode);
|
LOG.warn("TODO Handle stuck in transition: " + regionNode);
|
||||||
}
|
}
|
||||||
|
@ -1181,7 +1181,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
@Override
|
@Override
|
||||||
public void visitRegionState(final RegionInfo regionInfo, final State state,
|
public void visitRegionState(final RegionInfo regionInfo, final State state,
|
||||||
final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
|
final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
|
||||||
final RegionStateNode regionNode = regionStates.getOrCreateRegionNode(regionInfo);
|
final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
|
||||||
State localState = state;
|
State localState = state;
|
||||||
if (localState == null) {
|
if (localState == null) {
|
||||||
// No region state column data in hbase:meta table! Are I doing a rolling upgrade from
|
// No region state column data in hbase:meta table! Are I doing a rolling upgrade from
|
||||||
|
@ -1200,7 +1200,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
|
|
||||||
if (localState == State.OPEN) {
|
if (localState == State.OPEN) {
|
||||||
assert regionLocation != null : "found null region location for " + regionNode;
|
assert regionLocation != null : "found null region location for " + regionNode;
|
||||||
regionStates.addRegionToServer(regionLocation, regionNode);
|
regionStates.addRegionToServer(regionNode);
|
||||||
} else if (localState == State.OFFLINE || regionInfo.isOffline()) {
|
} else if (localState == State.OFFLINE || regionInfo.isOffline()) {
|
||||||
regionStates.addToOfflineRegions(regionNode);
|
regionStates.addToOfflineRegions(regionNode);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1227,7 +1227,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
long st, et;
|
long st, et;
|
||||||
|
|
||||||
st = System.currentTimeMillis();
|
st = System.currentTimeMillis();
|
||||||
for (RegionStateNode regionNode: regionStates.getRegionNodes()) {
|
for (RegionStateNode regionNode: regionStates.getRegionStateNodes()) {
|
||||||
if (regionNode.getState() == State.OPEN) {
|
if (regionNode.getState() == State.OPEN) {
|
||||||
final ServerName serverName = regionNode.getRegionLocation();
|
final ServerName serverName = regionNode.getRegionLocation();
|
||||||
if (!master.getServerManager().isServerOnline(serverName)) {
|
if (!master.getServerManager().isServerOnline(serverName)) {
|
||||||
|
@ -1331,7 +1331,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
|
|
||||||
public void offlineRegion(final RegionInfo regionInfo) {
|
public void offlineRegion(final RegionInfo regionInfo) {
|
||||||
// TODO used by MasterRpcServices ServerCrashProcedure
|
// TODO used by MasterRpcServices ServerCrashProcedure
|
||||||
final RegionStateNode node = regionStates.getRegionNode(regionInfo);
|
final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
|
||||||
if (node != null) node.offline();
|
if (node != null) node.offline();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1412,7 +1412,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionInfo getRegionInfo(final byte[] regionName) {
|
public RegionInfo getRegionInfo(final byte[] regionName) {
|
||||||
final RegionStateNode regionState = regionStates.getRegionNodeFromName(regionName);
|
final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
|
||||||
return regionState != null ? regionState.getRegionInfo() : null;
|
return regionState != null ? regionState.getRegionInfo() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1440,11 +1440,9 @@ public class AssignmentManager implements ServerListener {
|
||||||
|
|
||||||
public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException {
|
public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException {
|
||||||
synchronized (regionNode) {
|
synchronized (regionNode) {
|
||||||
State state = regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
|
regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
|
||||||
regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode);
|
regionStates.addRegionToServer(regionNode);
|
||||||
regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state,
|
regionStateStore.updateRegionLocation(regionNode);
|
||||||
regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM,
|
|
||||||
regionNode.getProcedure().getProcId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the operation count metrics
|
// update the operation count metrics
|
||||||
|
@ -1468,18 +1466,16 @@ public class AssignmentManager implements ServerListener {
|
||||||
public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
|
public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
|
||||||
final RegionInfo hri = regionNode.getRegionInfo();
|
final RegionInfo hri = regionNode.getRegionInfo();
|
||||||
synchronized (regionNode) {
|
synchronized (regionNode) {
|
||||||
State state = regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
|
regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
|
||||||
if (isMetaRegion(hri)) {
|
if (isMetaRegion(hri)) {
|
||||||
master.getTableStateManager().setTableState(TableName.META_TABLE_NAME,
|
master.getTableStateManager().setTableState(TableName.META_TABLE_NAME,
|
||||||
TableState.State.ENABLED);
|
TableState.State.ENABLED);
|
||||||
setMetaInitialized(hri, true);
|
setMetaInitialized(hri, true);
|
||||||
}
|
}
|
||||||
regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode);
|
regionStates.addRegionToServer(regionNode);
|
||||||
// TODO: OPENING Updates hbase:meta too... we need to do both here and there?
|
// TODO: OPENING Updates hbase:meta too... we need to do both here and there?
|
||||||
// That is a lot of hbase:meta writing.
|
// That is a lot of hbase:meta writing.
|
||||||
regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state,
|
regionStateStore.updateRegionLocation(regionNode);
|
||||||
regionNode.getRegionLocation(), regionNode.getLastHost(), regionNode.getOpenSeqNum(),
|
|
||||||
regionNode.getProcedure().getProcId());
|
|
||||||
sendRegionOpenedNotification(hri, regionNode.getRegionLocation());
|
sendRegionOpenedNotification(hri, regionNode.getRegionLocation());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1487,15 +1483,13 @@ public class AssignmentManager implements ServerListener {
|
||||||
public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException {
|
public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException {
|
||||||
final RegionInfo hri = regionNode.getRegionInfo();
|
final RegionInfo hri = regionNode.getRegionInfo();
|
||||||
synchronized (regionNode) {
|
synchronized (regionNode) {
|
||||||
State state = regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
|
regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
|
||||||
// Set meta has not initialized early. so people trying to create/edit tables will wait
|
// Set meta has not initialized early. so people trying to create/edit tables will wait
|
||||||
if (isMetaRegion(hri)) {
|
if (isMetaRegion(hri)) {
|
||||||
setMetaInitialized(hri, false);
|
setMetaInitialized(hri, false);
|
||||||
}
|
}
|
||||||
regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode);
|
regionStates.addRegionToServer(regionNode);
|
||||||
regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state,
|
regionStateStore.updateRegionLocation(regionNode);
|
||||||
regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM,
|
|
||||||
regionNode.getProcedure().getProcId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the operation count metrics
|
// update the operation count metrics
|
||||||
|
@ -1510,13 +1504,11 @@ public class AssignmentManager implements ServerListener {
|
||||||
public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
|
public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
|
||||||
final RegionInfo hri = regionNode.getRegionInfo();
|
final RegionInfo hri = regionNode.getRegionInfo();
|
||||||
synchronized (regionNode) {
|
synchronized (regionNode) {
|
||||||
State state = regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE);
|
regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE);
|
||||||
regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
|
regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
|
||||||
regionNode.setLastHost(regionNode.getRegionLocation());
|
regionNode.setLastHost(regionNode.getRegionLocation());
|
||||||
regionNode.setRegionLocation(null);
|
regionNode.setRegionLocation(null);
|
||||||
regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state,
|
regionStateStore.updateRegionLocation(regionNode);
|
||||||
regionNode.getRegionLocation()/*null*/, regionNode.getLastHost(),
|
|
||||||
HConstants.NO_SEQNUM, regionNode.getProcedure().getProcId());
|
|
||||||
sendRegionClosedNotification(hri);
|
sendRegionClosedNotification(hri);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1529,11 +1521,11 @@ public class AssignmentManager implements ServerListener {
|
||||||
// Update its state in regionStates to it shows as offline and split when read
|
// Update its state in regionStates to it shows as offline and split when read
|
||||||
// later figuring what regions are in a table and what are not: see
|
// later figuring what regions are in a table and what are not: see
|
||||||
// regionStates#getRegionsOfTable
|
// regionStates#getRegionsOfTable
|
||||||
final RegionStateNode node = regionStates.getOrCreateRegionNode(parent);
|
final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
|
||||||
node.setState(State.SPLIT);
|
node.setState(State.SPLIT);
|
||||||
final RegionStateNode nodeA = regionStates.getOrCreateRegionNode(daughterA);
|
final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
|
||||||
nodeA.setState(State.SPLITTING_NEW);
|
nodeA.setState(State.SPLITTING_NEW);
|
||||||
final RegionStateNode nodeB = regionStates.getOrCreateRegionNode(daughterB);
|
final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
|
||||||
nodeB.setState(State.SPLITTING_NEW);
|
nodeB.setState(State.SPLITTING_NEW);
|
||||||
|
|
||||||
regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
|
regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
|
||||||
|
@ -1554,7 +1546,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
*/
|
*/
|
||||||
public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
|
public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
|
||||||
final RegionInfo mother, final RegionInfo father) throws IOException {
|
final RegionInfo mother, final RegionInfo father) throws IOException {
|
||||||
final RegionStateNode node = regionStates.getOrCreateRegionNode(child);
|
final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
|
||||||
node.setState(State.MERGED);
|
node.setState(State.MERGED);
|
||||||
regionStates.deleteRegion(mother);
|
regionStates.deleteRegion(mother);
|
||||||
regionStates.deleteRegion(father);
|
regionStates.deleteRegion(father);
|
||||||
|
|
|
@ -556,8 +556,8 @@ public class MergeTableRegionsProcedure
|
||||||
public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
|
public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
|
||||||
// Set State.MERGING to regions to be merged
|
// Set State.MERGING to regions to be merged
|
||||||
RegionStates regionStates = env.getAssignmentManager().getRegionStates();
|
RegionStates regionStates = env.getAssignmentManager().getRegionStates();
|
||||||
regionStates.getRegionNode(regionsToMerge[0]).setState(State.MERGING);
|
regionStates.getRegionStateNode(regionsToMerge[0]).setState(State.MERGING);
|
||||||
regionStates.getRegionNode(regionsToMerge[1]).setState(State.MERGING);
|
regionStates.getRegionStateNode(regionsToMerge[1]).setState(State.MERGING);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -569,8 +569,8 @@ public class MergeTableRegionsProcedure
|
||||||
private void setRegionStateBackToOpen(final MasterProcedureEnv env) throws IOException {
|
private void setRegionStateBackToOpen(final MasterProcedureEnv env) throws IOException {
|
||||||
// revert region state to Open
|
// revert region state to Open
|
||||||
RegionStates regionStates = env.getAssignmentManager().getRegionStates();
|
RegionStates regionStates = env.getAssignmentManager().getRegionStates();
|
||||||
regionStates.getRegionNode(regionsToMerge[0]).setState(State.OPEN);
|
regionStates.getRegionStateNode(regionsToMerge[0]).setState(State.OPEN);
|
||||||
regionStates.getRegionNode(regionsToMerge[1]).setState(State.OPEN);
|
regionStates.getRegionStateNode(regionsToMerge[1]).setState(State.OPEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -595,7 +595,7 @@ public class MergeTableRegionsProcedure
|
||||||
|
|
||||||
//Prepare to create merged regions
|
//Prepare to create merged regions
|
||||||
env.getAssignmentManager().getRegionStates().
|
env.getAssignmentManager().getRegionStates().
|
||||||
getOrCreateRegionNode(mergedRegion).setState(State.MERGING_NEW);
|
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.RegionState;
|
|
||||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -138,23 +137,19 @@ public class RegionStateStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateRegionLocation(final RegionInfo regionInfo, final State state,
|
public void updateRegionLocation(RegionStates.RegionStateNode regionStateNode)
|
||||||
final ServerName regionLocation, final ServerName lastHost, final long openSeqNum,
|
|
||||||
final long pid)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (regionInfo.isMetaRegion()) {
|
if (regionStateNode.getRegionInfo().isMetaRegion()) {
|
||||||
updateMetaLocation(regionInfo, regionLocation);
|
updateMetaLocation(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
|
||||||
} else {
|
} else {
|
||||||
updateUserRegionLocation(regionInfo, state, regionLocation, lastHost, openSeqNum, pid);
|
long openSeqNum = regionStateNode.getState() == State.OPEN ?
|
||||||
|
regionStateNode.getOpenSeqNum() : HConstants.NO_SEQNUM;
|
||||||
|
updateUserRegionLocation(regionStateNode.getRegionInfo(), regionStateNode.getState(),
|
||||||
|
regionStateNode.getRegionLocation(), regionStateNode.getLastHost(), openSeqNum,
|
||||||
|
regionStateNode.getProcedure().getProcId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateRegionState(final long openSeqNum, final long pid,
|
|
||||||
final RegionState newState, final RegionState oldState) throws IOException {
|
|
||||||
updateRegionLocation(newState.getRegion(), newState.getState(), newState.getServerName(),
|
|
||||||
oldState != null ? oldState.getServerName() : null, openSeqNum, pid);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void updateMetaLocation(final RegionInfo regionInfo, final ServerName serverName)
|
protected void updateMetaLocation(final RegionInfo regionInfo, final ServerName serverName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -123,13 +122,18 @@ public class RegionStates {
|
||||||
this.event = new AssignmentProcedureEvent(regionInfo);
|
this.event = new AssignmentProcedureEvent(regionInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param update new region state this node should be assigned.
|
||||||
|
* @param expected current state should be in this given list of expected states
|
||||||
|
* @return true, if current state is in expected list; otherwise false.
|
||||||
|
*/
|
||||||
public boolean setState(final State update, final State... expected) {
|
public boolean setState(final State update, final State... expected) {
|
||||||
final boolean expectedState = isInState(expected);
|
if (!isInState(expected)) {
|
||||||
if (expectedState) {
|
return false;
|
||||||
|
}
|
||||||
this.state = update;
|
this.state = update;
|
||||||
this.lastUpdate = EnvironmentEdgeManager.currentTime();
|
this.lastUpdate = EnvironmentEdgeManager.currentTime();
|
||||||
}
|
return true;
|
||||||
return expectedState;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -145,13 +149,12 @@ public class RegionStates {
|
||||||
* Set new {@link State} but only if currently in <code>expected</code> State
|
* Set new {@link State} but only if currently in <code>expected</code> State
|
||||||
* (if not, throw {@link UnexpectedStateException}.
|
* (if not, throw {@link UnexpectedStateException}.
|
||||||
*/
|
*/
|
||||||
public State transitionState(final State update, final State... expected)
|
public void transitionState(final State update, final State... expected)
|
||||||
throws UnexpectedStateException {
|
throws UnexpectedStateException {
|
||||||
if (!setState(update, expected)) {
|
if (!setState(update, expected)) {
|
||||||
throw new UnexpectedStateException("Expected " + Arrays.toString(expected) +
|
throw new UnexpectedStateException("Expected " + Arrays.toString(expected) +
|
||||||
" so could move to " + update + " but current state=" + getState());
|
" so could move to " + update + " but current state=" + getState());
|
||||||
}
|
}
|
||||||
return update;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isInState(final State... expected) {
|
public boolean isInState(final State... expected) {
|
||||||
|
@ -253,6 +256,10 @@ public class RegionStates {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RegionState toRegionState() {
|
||||||
|
return new RegionState(getRegionInfo(), getState(), getLastUpdate(), getRegionLocation());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(final RegionStateNode other) {
|
public int compareTo(final RegionStateNode other) {
|
||||||
// NOTE: RegionInfo sort by table first, so we are relying on that.
|
// NOTE: RegionInfo sort by table first, so we are relying on that.
|
||||||
|
@ -311,7 +318,7 @@ public class RegionStates {
|
||||||
|
|
||||||
public ServerStateNode(final ServerName serverName) {
|
public ServerStateNode(final ServerName serverName) {
|
||||||
this.serverName = serverName;
|
this.serverName = serverName;
|
||||||
this.regions = new HashSet<RegionStateNode>();
|
this.regions = ConcurrentHashMap.newKeySet();
|
||||||
this.reportEvent = new ServerReportEvent(serverName);
|
this.reportEvent = new ServerReportEvent(serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -440,33 +447,23 @@ public class RegionStates {
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// RegionStateNode helpers
|
// RegionStateNode helpers
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
protected RegionStateNode createRegionNode(final RegionInfo regionInfo) {
|
protected RegionStateNode createRegionStateNode(final RegionInfo regionInfo) {
|
||||||
RegionStateNode newNode = new RegionStateNode(regionInfo);
|
RegionStateNode newNode = new RegionStateNode(regionInfo);
|
||||||
RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
|
RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
|
||||||
return oldNode != null ? oldNode : newNode;
|
return oldNode != null ? oldNode : newNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RegionStateNode getOrCreateRegionNode(final RegionInfo regionInfo) {
|
protected RegionStateNode getOrCreateRegionStateNode(final RegionInfo regionInfo) {
|
||||||
RegionStateNode node = regionsMap.get(regionInfo.getRegionName());
|
RegionStateNode node = regionsMap.get(regionInfo.getRegionName());
|
||||||
return node != null ? node : createRegionNode(regionInfo);
|
return node != null ? node : createRegionStateNode(regionInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
RegionStateNode getRegionNodeFromName(final byte[] regionName) {
|
RegionStateNode getRegionStateNodeFromName(final byte[] regionName) {
|
||||||
return regionsMap.get(regionName);
|
return regionsMap.get(regionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RegionStateNode getRegionNode(final RegionInfo regionInfo) {
|
protected RegionStateNode getRegionStateNode(final RegionInfo regionInfo) {
|
||||||
return getRegionNodeFromName(regionInfo.getRegionName());
|
return getRegionStateNodeFromName(regionInfo.getRegionName());
|
||||||
}
|
|
||||||
|
|
||||||
RegionStateNode getRegionNodeFromEncodedName(final String encodedRegionName) {
|
|
||||||
// TODO: Need a map <encodedName, ...> but it is just dispatch merge...
|
|
||||||
for (RegionStateNode node: regionsMap.values()) {
|
|
||||||
if (node.getRegionInfo().getEncodedName().equals(encodedRegionName)) {
|
|
||||||
return node;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteRegion(final RegionInfo regionInfo) {
|
public void deleteRegion(final RegionInfo regionInfo) {
|
||||||
|
@ -491,7 +488,7 @@ public class RegionStates {
|
||||||
final ArrayList<RegionState> regions = new ArrayList<RegionState>();
|
final ArrayList<RegionState> regions = new ArrayList<RegionState>();
|
||||||
for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) {
|
for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) {
|
||||||
if (!node.getTable().equals(tableName)) break;
|
if (!node.getTable().equals(tableName)) break;
|
||||||
regions.add(createRegionState(node));
|
regions.add(node.toRegionState());
|
||||||
}
|
}
|
||||||
return regions;
|
return regions;
|
||||||
}
|
}
|
||||||
|
@ -505,14 +502,14 @@ public class RegionStates {
|
||||||
return regions;
|
return regions;
|
||||||
}
|
}
|
||||||
|
|
||||||
Collection<RegionStateNode> getRegionNodes() {
|
Collection<RegionStateNode> getRegionStateNodes() {
|
||||||
return regionsMap.values();
|
return regionsMap.values();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ArrayList<RegionState> getRegionStates() {
|
public ArrayList<RegionState> getRegionStates() {
|
||||||
final ArrayList<RegionState> regions = new ArrayList<RegionState>(regionsMap.size());
|
final ArrayList<RegionState> regions = new ArrayList<RegionState>(regionsMap.size());
|
||||||
for (RegionStateNode node: regionsMap.values()) {
|
for (RegionStateNode node: regionsMap.values()) {
|
||||||
regions.add(createRegionState(node));
|
regions.add(node.toRegionState());
|
||||||
}
|
}
|
||||||
return regions;
|
return regions;
|
||||||
}
|
}
|
||||||
|
@ -521,17 +518,18 @@ public class RegionStates {
|
||||||
// RegionState helpers
|
// RegionState helpers
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
public RegionState getRegionState(final RegionInfo regionInfo) {
|
public RegionState getRegionState(final RegionInfo regionInfo) {
|
||||||
return createRegionState(getRegionNode(regionInfo));
|
RegionStateNode regionStateNode = getRegionStateNode(regionInfo);
|
||||||
|
return regionStateNode == null ? null : regionStateNode.toRegionState();
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionState getRegionState(final String encodedRegionName) {
|
public RegionState getRegionState(final String encodedRegionName) {
|
||||||
return createRegionState(getRegionNodeFromEncodedName(encodedRegionName));
|
// TODO: Need a map <encodedName, ...> but it is just dispatch merge...
|
||||||
|
for (RegionStateNode node: regionsMap.values()) {
|
||||||
|
if (node.getRegionInfo().getEncodedName().equals(encodedRegionName)) {
|
||||||
|
return node.toRegionState();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
private RegionState createRegionState(final RegionStateNode node) {
|
return null;
|
||||||
return node == null ? null :
|
|
||||||
new RegionState(node.getRegionInfo(), node.getState(),
|
|
||||||
node.getLastUpdate(), node.getRegionLocation());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
|
@ -612,7 +610,7 @@ public class RegionStates {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void logSplit(final RegionInfo regionInfo) {
|
public void logSplit(final RegionInfo regionInfo) {
|
||||||
final RegionStateNode regionNode = getRegionNode(regionInfo);
|
final RegionStateNode regionNode = getRegionStateNode(regionInfo);
|
||||||
synchronized (regionNode) {
|
synchronized (regionNode) {
|
||||||
regionNode.setState(State.SPLIT);
|
regionNode.setState(State.SPLIT);
|
||||||
}
|
}
|
||||||
|
@ -620,7 +618,7 @@ public class RegionStates {
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void updateRegionState(final RegionInfo regionInfo, final State state) {
|
public void updateRegionState(final RegionInfo regionInfo, final State state) {
|
||||||
final RegionStateNode regionNode = getOrCreateRegionNode(regionInfo);
|
final RegionStateNode regionNode = getOrCreateRegionStateNode(regionInfo);
|
||||||
synchronized (regionNode) {
|
synchronized (regionNode) {
|
||||||
regionNode.setState(state);
|
regionNode.setState(state);
|
||||||
}
|
}
|
||||||
|
@ -640,7 +638,7 @@ public class RegionStates {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isRegionInState(final RegionInfo regionInfo, final State... state) {
|
public boolean isRegionInState(final RegionInfo regionInfo, final State... state) {
|
||||||
final RegionStateNode region = getRegionNode(regionInfo);
|
final RegionStateNode region = getRegionStateNode(regionInfo);
|
||||||
if (region != null) {
|
if (region != null) {
|
||||||
synchronized (region) {
|
synchronized (region) {
|
||||||
return region.isInState(state);
|
return region.isInState(state);
|
||||||
|
@ -664,7 +662,7 @@ public class RegionStates {
|
||||||
final Collection<RegionInfo> regions) {
|
final Collection<RegionInfo> regions) {
|
||||||
final Map<ServerName, List<RegionInfo>> result = new HashMap<ServerName, List<RegionInfo>>();
|
final Map<ServerName, List<RegionInfo>> result = new HashMap<ServerName, List<RegionInfo>>();
|
||||||
for (RegionInfo hri: regions) {
|
for (RegionInfo hri: regions) {
|
||||||
final RegionStateNode node = getRegionNode(hri);
|
final RegionStateNode node = getRegionStateNode(hri);
|
||||||
if (node == null) continue;
|
if (node == null) continue;
|
||||||
|
|
||||||
// TODO: State.OPEN
|
// TODO: State.OPEN
|
||||||
|
@ -707,7 +705,7 @@ public class RegionStates {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ServerName getRegionServerOfRegion(final RegionInfo regionInfo) {
|
public ServerName getRegionServerOfRegion(final RegionInfo regionInfo) {
|
||||||
final RegionStateNode region = getRegionNode(regionInfo);
|
final RegionStateNode region = getRegionStateNode(regionInfo);
|
||||||
if (region != null) {
|
if (region != null) {
|
||||||
synchronized (region) {
|
synchronized (region) {
|
||||||
ServerName server = region.getRegionLocation();
|
ServerName server = region.getRegionLocation();
|
||||||
|
@ -815,7 +813,7 @@ public class RegionStates {
|
||||||
if (node == null) return null;
|
if (node == null) return null;
|
||||||
|
|
||||||
synchronized (node) {
|
synchronized (node) {
|
||||||
return node.isInTransition() ? createRegionState(node) : null;
|
return node.isInTransition() ? node.toRegionState() : null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -833,7 +831,7 @@ public class RegionStates {
|
||||||
public List<RegionState> getRegionsStateInTransition() {
|
public List<RegionState> getRegionsStateInTransition() {
|
||||||
final List<RegionState> rit = new ArrayList<RegionState>(regionInTransition.size());
|
final List<RegionState> rit = new ArrayList<RegionState>(regionInTransition.size());
|
||||||
for (RegionStateNode node: regionInTransition.values()) {
|
for (RegionStateNode node: regionInTransition.values()) {
|
||||||
rit.add(createRegionState(node));
|
rit.add(node.toRegionState());
|
||||||
}
|
}
|
||||||
return rit;
|
return rit;
|
||||||
}
|
}
|
||||||
|
@ -841,7 +839,7 @@ public class RegionStates {
|
||||||
public SortedSet<RegionState> getRegionsInTransitionOrderedByTimestamp() {
|
public SortedSet<RegionState> getRegionsInTransitionOrderedByTimestamp() {
|
||||||
final SortedSet<RegionState> rit = new TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR);
|
final SortedSet<RegionState> rit = new TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR);
|
||||||
for (RegionStateNode node: regionInTransition.values()) {
|
for (RegionStateNode node: regionInTransition.values()) {
|
||||||
rit.add(createRegionState(node));
|
rit.add(node.toRegionState());
|
||||||
}
|
}
|
||||||
return rit;
|
return rit;
|
||||||
}
|
}
|
||||||
|
@ -873,7 +871,7 @@ public class RegionStates {
|
||||||
this.regionNode = regionNode;
|
this.regionNode = regionNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionStateNode getRegionNode() {
|
public RegionStateNode getRegionStateNode() {
|
||||||
return regionNode;
|
return regionNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -922,7 +920,7 @@ public class RegionStates {
|
||||||
|
|
||||||
ArrayList<RegionState> regions = new ArrayList<RegionState>(regionFailedOpen.size());
|
ArrayList<RegionState> regions = new ArrayList<RegionState>(regionFailedOpen.size());
|
||||||
for (RegionFailedOpen r: regionFailedOpen.values()) {
|
for (RegionFailedOpen r: regionFailedOpen.values()) {
|
||||||
regions.add(createRegionState(r.getRegionNode()));
|
regions.add(r.getRegionStateNode().toRegionState());
|
||||||
}
|
}
|
||||||
return regions;
|
return regions;
|
||||||
}
|
}
|
||||||
|
@ -958,9 +956,8 @@ public class RegionStates {
|
||||||
return numServers == 0 ? 0.0: (double)totalLoad / (double)numServers;
|
return numServers == 0 ? 0.0: (double)totalLoad / (double)numServers;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ServerStateNode addRegionToServer(final ServerName serverName,
|
public ServerStateNode addRegionToServer(final RegionStateNode regionNode) {
|
||||||
final RegionStateNode regionNode) {
|
ServerStateNode serverNode = getOrCreateServer(regionNode.getRegionLocation());
|
||||||
ServerStateNode serverNode = getOrCreateServer(serverName);
|
|
||||||
serverNode.addRegion(regionNode);
|
serverNode.addRegion(regionNode);
|
||||||
return serverNode;
|
return serverNode;
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,19 +42,21 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for the Assign and Unassign Procedure.
|
* Base class for the Assign and Unassign Procedure.
|
||||||
* There can only be one RegionTransitionProcedure per region running at a time
|
*
|
||||||
* since each procedure takes a lock on the region (see MasterProcedureScheduler).
|
* Locking:
|
||||||
|
* Takes exclusive lock on the region being assigned/unassigned. Thus, there can only be one
|
||||||
|
* RegionTransitionProcedure per region running at a time (see MasterProcedureScheduler).
|
||||||
*
|
*
|
||||||
* <p>This procedure is asynchronous and responds to external events.
|
* <p>This procedure is asynchronous and responds to external events.
|
||||||
* The AssignmentManager will notify this procedure when the RS completes
|
* The AssignmentManager will notify this procedure when the RS completes
|
||||||
* the operation and reports the transitioned state
|
* the operation and reports the transitioned state
|
||||||
* (see the Assign and Unassign class for more detail).
|
* (see the Assign and Unassign class for more detail).</p>
|
||||||
*
|
*
|
||||||
* <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
|
* <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
|
||||||
* first submitted, to the REGION_TRANSITION_DISPATCH state when the request
|
* first submitted, to the REGION_TRANSITION_DISPATCH state when the request
|
||||||
* to remote server is sent and the Procedure is suspended waiting on external
|
* to remote server is sent and the Procedure is suspended waiting on external
|
||||||
* event to be woken again. Once the external event is triggered, Procedure
|
* event to be woken again. Once the external event is triggered, Procedure
|
||||||
* moves to the REGION_TRANSITION_FINISH state.
|
* moves to the REGION_TRANSITION_FINISH state.</p>
|
||||||
*
|
*
|
||||||
* <p>NOTE: {@link AssignProcedure} and {@link UnassignProcedure} should not be thought of
|
* <p>NOTE: {@link AssignProcedure} and {@link UnassignProcedure} should not be thought of
|
||||||
* as being asymmetric, at least currently.
|
* as being asymmetric, at least currently.
|
||||||
|
@ -67,12 +69,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
||||||
* AssignProcedure#handleFailure(MasterProcedureEnv, RegionStateNode) re-attempts the
|
* AssignProcedure#handleFailure(MasterProcedureEnv, RegionStateNode) re-attempts the
|
||||||
* assignment by setting the procedure state to REGION_TRANSITION_QUEUE and forces
|
* assignment by setting the procedure state to REGION_TRANSITION_QUEUE and forces
|
||||||
* assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When
|
* assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When
|
||||||
* the number of attempts reach hreshold configuration 'hbase.assignment.maximum.attempts',
|
* the number of attempts reaches threshold configuration 'hbase.assignment.maximum.attempts',
|
||||||
* the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are
|
* the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are
|
||||||
* intentionally not implemented. It is a 'one shot' procedure. See its class doc for how it
|
* intentionally not implemented. It is a 'one shot' procedure. See its class doc for how it
|
||||||
* handles failure.
|
* handles failure.
|
||||||
* </li>
|
* </li>
|
||||||
* </ul>
|
* </ul>
|
||||||
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as possible,
|
* <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as possible,
|
||||||
* re-attempting with any target makes sense if specified target fails in case of
|
* re-attempting with any target makes sense if specified target fails in case of
|
||||||
|
@ -88,21 +91,18 @@ public abstract class RegionTransitionProcedure
|
||||||
|
|
||||||
protected final AtomicBoolean aborted = new AtomicBoolean(false);
|
protected final AtomicBoolean aborted = new AtomicBoolean(false);
|
||||||
|
|
||||||
private RegionTransitionState transitionState =
|
private RegionTransitionState transitionState = RegionTransitionState.REGION_TRANSITION_QUEUE;
|
||||||
RegionTransitionState.REGION_TRANSITION_QUEUE;
|
|
||||||
private RegionInfo regionInfo;
|
private RegionInfo regionInfo;
|
||||||
private volatile boolean lock = false;
|
private volatile boolean lock = false;
|
||||||
|
|
||||||
public RegionTransitionProcedure() {
|
|
||||||
// Required by the Procedure framework to create the procedure on replay
|
// Required by the Procedure framework to create the procedure on replay
|
||||||
super();
|
public RegionTransitionProcedure() {}
|
||||||
}
|
|
||||||
|
|
||||||
public RegionTransitionProcedure(final RegionInfo regionInfo) {
|
public RegionTransitionProcedure(final RegionInfo regionInfo) {
|
||||||
this.regionInfo = regionInfo;
|
this.regionInfo = regionInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionInfo getRegionInfo() {
|
protected RegionInfo getRegionInfo() {
|
||||||
return regionInfo;
|
return regionInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,15 +131,14 @@ public abstract class RegionTransitionProcedure
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionStateNode getRegionState(final MasterProcedureEnv env) {
|
public RegionStateNode getRegionState(final MasterProcedureEnv env) {
|
||||||
return env.getAssignmentManager().getRegionStates().
|
return env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegionInfo());
|
||||||
getOrCreateRegionNode(getRegionInfo());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setTransitionState(final RegionTransitionState state) {
|
void setTransitionState(final RegionTransitionState state) {
|
||||||
this.transitionState = state;
|
this.transitionState = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RegionTransitionState getTransitionState() {
|
RegionTransitionState getTransitionState() {
|
||||||
return transitionState;
|
return transitionState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,7 +199,7 @@ public abstract class RegionTransitionProcedure
|
||||||
* and this procedure has been set into a suspended state OR, we failed and
|
* and this procedure has been set into a suspended state OR, we failed and
|
||||||
* this procedure has been put back on the scheduler ready for another worker
|
* this procedure has been put back on the scheduler ready for another worker
|
||||||
* to pick it up. In both cases, we need to exit the current Worker processing
|
* to pick it up. In both cases, we need to exit the current Worker processing
|
||||||
* toute de suite!
|
* immediately!
|
||||||
* @return True if we successfully dispatched the call and false if we failed;
|
* @return True if we successfully dispatched the call and false if we failed;
|
||||||
* if failed, we need to roll back any setup done for the dispatch.
|
* if failed, we need to roll back any setup done for the dispatch.
|
||||||
*/
|
*/
|
||||||
|
@ -217,7 +216,7 @@ public abstract class RegionTransitionProcedure
|
||||||
getRegionState(env).getProcedureEvent().suspend();
|
getRegionState(env).getProcedureEvent().suspend();
|
||||||
|
|
||||||
// Tricky because the below call to addOperationToNode can fail. If it fails, we need to
|
// Tricky because the below call to addOperationToNode can fail. If it fails, we need to
|
||||||
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requeues us -- and
|
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requests us -- and
|
||||||
// ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
|
// ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
|
||||||
// wake to undo the above suspend.
|
// wake to undo the above suspend.
|
||||||
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
||||||
|
|
|
@ -133,7 +133,7 @@ public class SplitTableRegionProcedure
|
||||||
throw new IllegalArgumentException ("Can't invoke split on non-default regions directly");
|
throw new IllegalArgumentException ("Can't invoke split on non-default regions directly");
|
||||||
}
|
}
|
||||||
RegionStateNode node =
|
RegionStateNode node =
|
||||||
env.getAssignmentManager().getRegionStates().getRegionNode(getParentRegion());
|
env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion());
|
||||||
IOException splittableCheckIOE = null;
|
IOException splittableCheckIOE = null;
|
||||||
boolean splittable = false;
|
boolean splittable = false;
|
||||||
if (node != null) {
|
if (node != null) {
|
||||||
|
@ -407,7 +407,7 @@ public class SplitTableRegionProcedure
|
||||||
public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException {
|
public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException {
|
||||||
// Check whether the region is splittable
|
// Check whether the region is splittable
|
||||||
RegionStateNode node =
|
RegionStateNode node =
|
||||||
env.getAssignmentManager().getRegionStates().getRegionNode(getParentRegion());
|
env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion());
|
||||||
|
|
||||||
if (node == null) {
|
if (node == null) {
|
||||||
throw new UnknownRegionException(getParentRegion().getRegionNameAsString());
|
throw new UnknownRegionException(getParentRegion().getRegionNameAsString());
|
||||||
|
|
|
@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ArrayListMultimap;
|
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ArrayListMultimap;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
|
@ -64,7 +66,8 @@ public class RSProcedureDispatcher
|
||||||
private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0
|
private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0
|
||||||
|
|
||||||
protected final MasterServices master;
|
protected final MasterServices master;
|
||||||
protected final long rsStartupWaitTime;
|
private final long rsStartupWaitTime;
|
||||||
|
private MasterProcedureEnv procedureEnv;
|
||||||
|
|
||||||
public RSProcedureDispatcher(final MasterServices master) {
|
public RSProcedureDispatcher(final MasterServices master) {
|
||||||
super(master.getConfiguration());
|
super(master.getConfiguration());
|
||||||
|
@ -81,6 +84,7 @@ public class RSProcedureDispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
master.getServerManager().registerListener(this);
|
master.getServerManager().registerListener(this);
|
||||||
|
procedureEnv = master.getMasterProcedureExecutor().getEnvironment();
|
||||||
for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
|
for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
|
||||||
addNode(serverName);
|
addNode(serverName);
|
||||||
}
|
}
|
||||||
|
@ -99,18 +103,18 @@ public class RSProcedureDispatcher
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void remoteDispatch(final ServerName serverName,
|
protected void remoteDispatch(final ServerName serverName,
|
||||||
final Set<RemoteProcedure> operations) {
|
final Set<RemoteProcedure> remoteProcedures) {
|
||||||
final int rsVersion = master.getAssignmentManager().getServerVersion(serverName);
|
final int rsVersion = master.getAssignmentManager().getServerVersion(serverName);
|
||||||
if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
|
if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
|
||||||
LOG.info(String.format(
|
LOG.info(String.format(
|
||||||
"Using procedure batch rpc execution for serverName=%s version=%s",
|
"Using procedure batch rpc execution for serverName=%s version=%s",
|
||||||
serverName, rsVersion));
|
serverName, rsVersion));
|
||||||
submitTask(new ExecuteProceduresRemoteCall(serverName, operations));
|
submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
|
||||||
} else {
|
} else {
|
||||||
LOG.info(String.format(
|
LOG.info(String.format(
|
||||||
"Fallback to compat rpc execution for serverName=%s version=%s",
|
"Fallback to compat rpc execution for serverName=%s version=%s",
|
||||||
serverName, rsVersion));
|
serverName, rsVersion));
|
||||||
submitTask(new CompatRemoteProcedureResolver(serverName, operations));
|
submitTask(new CompatRemoteProcedureResolver(serverName, remoteProcedures));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,9 +122,8 @@ public class RSProcedureDispatcher
|
||||||
final Set<RemoteProcedure> operations) {
|
final Set<RemoteProcedure> operations) {
|
||||||
// TODO: Replace with a ServerNotOnlineException()
|
// TODO: Replace with a ServerNotOnlineException()
|
||||||
final IOException e = new DoNotRetryIOException("server not online " + serverName);
|
final IOException e = new DoNotRetryIOException("server not online " + serverName);
|
||||||
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
|
|
||||||
for (RemoteProcedure proc: operations) {
|
for (RemoteProcedure proc: operations) {
|
||||||
proc.remoteCallFailed(env, serverName, e);
|
proc.remoteCallFailed(procedureEnv, serverName, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,6 +139,8 @@ public class RSProcedureDispatcher
|
||||||
* Base remote call
|
* Base remote call
|
||||||
*/
|
*/
|
||||||
protected abstract class AbstractRSRemoteCall implements Callable<Void> {
|
protected abstract class AbstractRSRemoteCall implements Callable<Void> {
|
||||||
|
public abstract Void call();
|
||||||
|
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
|
|
||||||
private int numberOfAttemptsSoFar = 0;
|
private int numberOfAttemptsSoFar = 0;
|
||||||
|
@ -145,8 +150,6 @@ public class RSProcedureDispatcher
|
||||||
this.serverName = serverName;
|
this.serverName = serverName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract Void call();
|
|
||||||
|
|
||||||
protected AdminService.BlockingInterface getRsAdmin() throws IOException {
|
protected AdminService.BlockingInterface getRsAdmin() throws IOException {
|
||||||
final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
|
final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
|
||||||
if (admin == null) {
|
if (admin == null) {
|
||||||
|
@ -223,17 +226,29 @@ public class RSProcedureDispatcher
|
||||||
void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
|
void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
|
||||||
|
* from the given {@code remoteProcedures} and groups them by class of the returned operation.
|
||||||
|
* Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
|
||||||
|
* {@link RegionCloseOperation}s.
|
||||||
|
* @param serverName RegionServer to which the remote operations are sent
|
||||||
|
* @param remoteProcedures Remote procedures which are dispatched to the given server
|
||||||
|
* @param resolver Used to dispatch remote procedures to given server.
|
||||||
|
*/
|
||||||
public void splitAndResolveOperation(final ServerName serverName,
|
public void splitAndResolveOperation(final ServerName serverName,
|
||||||
final Set<RemoteProcedure> operations, final RemoteProcedureResolver resolver) {
|
final Set<RemoteProcedure> remoteProcedures, final RemoteProcedureResolver resolver) {
|
||||||
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
|
|
||||||
final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
|
final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
|
||||||
buildAndGroupRequestByType(env, serverName, operations);
|
buildAndGroupRequestByType(procedureEnv, serverName, remoteProcedures);
|
||||||
|
|
||||||
final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
|
final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
|
||||||
if (!openOps.isEmpty()) resolver.dispatchOpenRequests(env, openOps);
|
if (!openOps.isEmpty()) {
|
||||||
|
resolver.dispatchOpenRequests(procedureEnv, openOps);
|
||||||
|
}
|
||||||
|
|
||||||
final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
|
final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
|
||||||
if (!closeOps.isEmpty()) resolver.dispatchCloseRequests(env, closeOps);
|
if (!closeOps.isEmpty()) {
|
||||||
|
resolver.dispatchCloseRequests(procedureEnv, closeOps);
|
||||||
|
}
|
||||||
|
|
||||||
if (!reqsByType.isEmpty()) {
|
if (!reqsByType.isEmpty()) {
|
||||||
LOG.warn("unknown request type in the queue: " + reqsByType);
|
LOG.warn("unknown request type in the queue: " + reqsByType);
|
||||||
|
@ -245,34 +260,32 @@ public class RSProcedureDispatcher
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
|
protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
|
||||||
implements RemoteProcedureResolver {
|
implements RemoteProcedureResolver {
|
||||||
private final Set<RemoteProcedure> operations;
|
private final Set<RemoteProcedure> remoteProcedures;
|
||||||
|
|
||||||
private ExecuteProceduresRequest.Builder request = null;
|
private ExecuteProceduresRequest.Builder request = null;
|
||||||
|
|
||||||
public ExecuteProceduresRemoteCall(final ServerName serverName,
|
public ExecuteProceduresRemoteCall(final ServerName serverName,
|
||||||
final Set<RemoteProcedure> operations) {
|
final Set<RemoteProcedure> remoteProcedures) {
|
||||||
super(serverName);
|
super(serverName);
|
||||||
this.operations = operations;
|
this.remoteProcedures = remoteProcedures;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Void call() {
|
public Void call() {
|
||||||
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
|
|
||||||
|
|
||||||
request = ExecuteProceduresRequest.newBuilder();
|
request = ExecuteProceduresRequest.newBuilder();
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Building request with operations count=" + operations.size());
|
LOG.trace("Building request with operations count=" + remoteProcedures.size());
|
||||||
}
|
}
|
||||||
splitAndResolveOperation(getServerName(), operations, this);
|
splitAndResolveOperation(getServerName(), remoteProcedures, this);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
|
final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
|
||||||
remoteCallCompleted(env, response);
|
remoteCallCompleted(procedureEnv, response);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e = unwrapException(e);
|
e = unwrapException(e);
|
||||||
// TODO: In the future some operation may want to bail out early.
|
// TODO: In the future some operation may want to bail out early.
|
||||||
// TODO: How many times should we retry (use numberOfAttemptsSoFar)
|
// TODO: How many times should we retry (use numberOfAttemptsSoFar)
|
||||||
if (!scheduleForRetry(e)) {
|
if (!scheduleForRetry(e)) {
|
||||||
remoteCallFailed(env, e);
|
remoteCallFailed(procedureEnv, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -309,17 +322,12 @@ public class RSProcedureDispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
|
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
|
||||||
for (RemoteProcedure proc: operations) {
|
for (RemoteProcedure proc: remoteProcedures) {
|
||||||
proc.remoteCallFailed(env, getServerName(), e);
|
proc.remoteCallFailed(env, getServerName(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==========================================================================
|
|
||||||
// Compatibility calls
|
|
||||||
// Since we don't have a "batch proc-exec" request on the target RS
|
|
||||||
// we have to chunk the requests by type and dispatch the specific request.
|
|
||||||
// ==========================================================================
|
|
||||||
private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
|
private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
|
||||||
final ServerName serverName, final List<RegionOpenOperation> operations) {
|
final ServerName serverName, final List<RegionOpenOperation> operations) {
|
||||||
final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
|
final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
|
||||||
|
@ -331,6 +339,15 @@ public class RSProcedureDispatcher
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ==========================================================================
|
||||||
|
// Compatibility calls
|
||||||
|
// Since we don't have a "batch proc-exec" request on the target RS
|
||||||
|
// we have to chunk the requests by type and dispatch the specific request.
|
||||||
|
// ==========================================================================
|
||||||
|
/**
|
||||||
|
* Compatibility class used by {@link CompatRemoteProcedureResolver} to open regions using old
|
||||||
|
* {@link AdminService#openRegion(RpcController, OpenRegionRequest, RpcCallback)} rpc.
|
||||||
|
*/
|
||||||
private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
|
private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
|
||||||
private final List<RegionOpenOperation> operations;
|
private final List<RegionOpenOperation> operations;
|
||||||
|
|
||||||
|
@ -342,18 +359,18 @@ public class RSProcedureDispatcher
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void call() {
|
public Void call() {
|
||||||
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
|
final OpenRegionRequest request =
|
||||||
final OpenRegionRequest request = buildOpenRegionRequest(env, getServerName(), operations);
|
buildOpenRegionRequest(procedureEnv, getServerName(), operations);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
OpenRegionResponse response = sendRequest(getServerName(), request);
|
OpenRegionResponse response = sendRequest(getServerName(), request);
|
||||||
remoteCallCompleted(env, response);
|
remoteCallCompleted(procedureEnv, response);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e = unwrapException(e);
|
e = unwrapException(e);
|
||||||
// TODO: In the future some operation may want to bail out early.
|
// TODO: In the future some operation may want to bail out early.
|
||||||
// TODO: How many times should we retry (use numberOfAttemptsSoFar)
|
// TODO: How many times should we retry (use numberOfAttemptsSoFar)
|
||||||
if (!scheduleForRetry(e)) {
|
if (!scheduleForRetry(e)) {
|
||||||
remoteCallFailed(env, e);
|
remoteCallFailed(procedureEnv, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -385,6 +402,10 @@ public class RSProcedureDispatcher
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compatibility class used by {@link CompatRemoteProcedureResolver} to close regions using old
|
||||||
|
* {@link AdminService#closeRegion(RpcController, CloseRegionRequest, RpcCallback)} rpc.
|
||||||
|
*/
|
||||||
private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
|
private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
|
||||||
private final RegionCloseOperation operation;
|
private final RegionCloseOperation operation;
|
||||||
|
|
||||||
|
@ -396,17 +417,16 @@ public class RSProcedureDispatcher
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void call() {
|
public Void call() {
|
||||||
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
|
|
||||||
final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
|
final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
|
||||||
try {
|
try {
|
||||||
CloseRegionResponse response = sendRequest(getServerName(), request);
|
CloseRegionResponse response = sendRequest(getServerName(), request);
|
||||||
remoteCallCompleted(env, response);
|
remoteCallCompleted(procedureEnv, response);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e = unwrapException(e);
|
e = unwrapException(e);
|
||||||
// TODO: In the future some operation may want to bail out early.
|
// TODO: In the future some operation may want to bail out early.
|
||||||
// TODO: How many times should we retry (use numberOfAttemptsSoFar)
|
// TODO: How many times should we retry (use numberOfAttemptsSoFar)
|
||||||
if (!scheduleForRetry(e)) {
|
if (!scheduleForRetry(e)) {
|
||||||
remoteCallFailed(env, e);
|
remoteCallFailed(procedureEnv, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -432,6 +452,10 @@ public class RSProcedureDispatcher
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compatibility class to open and close regions using old endpoints (openRegion/closeRegion) in
|
||||||
|
* {@link AdminService}.
|
||||||
|
*/
|
||||||
protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver {
|
protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver {
|
||||||
private final Set<RemoteProcedure> operations;
|
private final Set<RemoteProcedure> operations;
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
|
@ -463,14 +487,16 @@ public class RSProcedureDispatcher
|
||||||
|
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// RPC Messages
|
// RPC Messages
|
||||||
// - ServerOperation: refreshConfig, grant, revoke, ...
|
// - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
|
||||||
// - RegionOperation: open, close, flush, snapshot, ...
|
// - RegionOperation: open, close, flush, snapshot, ...
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
|
/* Currently unused
|
||||||
public static abstract class ServerOperation extends RemoteOperation {
|
public static abstract class ServerOperation extends RemoteOperation {
|
||||||
protected ServerOperation(final RemoteProcedure remoteProcedure) {
|
protected ServerOperation(final RemoteProcedure remoteProcedure) {
|
||||||
super(remoteProcedure);
|
super(remoteProcedure);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
public static abstract class RegionOperation extends RemoteOperation {
|
public static abstract class RegionOperation extends RemoteOperation {
|
||||||
private final RegionInfo regionInfo;
|
private final RegionInfo regionInfo;
|
||||||
|
|
|
@ -123,8 +123,7 @@ public class TestMasterBalanceThrottling {
|
||||||
public void run() {
|
public void run() {
|
||||||
while (!stop.get()) {
|
while (!stop.get()) {
|
||||||
maxCount.set(Math.max(maxCount.get(),
|
maxCount.set(Math.max(maxCount.get(),
|
||||||
master.getAssignmentManager().getRegionStates()
|
master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount()));
|
||||||
.getRegionsInTransition().size()));
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -139,7 +138,7 @@ public class TestMasterBalanceThrottling {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void unbalance(HMaster master, TableName tableName) throws Exception {
|
private void unbalance(HMaster master, TableName tableName) throws Exception {
|
||||||
while (master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() > 0) {
|
while (master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount() > 0) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
HRegionServer biasedServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
|
HRegionServer biasedServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
|
||||||
|
@ -147,7 +146,7 @@ public class TestMasterBalanceThrottling {
|
||||||
master.move(regionInfo.getEncodedNameAsBytes(),
|
master.move(regionInfo.getEncodedNameAsBytes(),
|
||||||
Bytes.toBytes(biasedServer.getServerName().getServerName()));
|
Bytes.toBytes(biasedServer.getServerName().getServerName()));
|
||||||
}
|
}
|
||||||
while (master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() > 0) {
|
while (master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount() > 0) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ public class TestRegionState {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testSerializeDeserialize(final TableName tableName, final RegionState.State state) {
|
private void testSerializeDeserialize(final TableName tableName, final RegionState.State state) {
|
||||||
RegionState state1 = new RegionState(new HRegionInfo(tableName), state);
|
RegionState state1 = RegionState.createForTesting(new HRegionInfo(tableName), state);
|
||||||
ClusterStatusProtos.RegionState protobuf1 = state1.convert();
|
ClusterStatusProtos.RegionState protobuf1 = state1.convert();
|
||||||
RegionState state2 = RegionState.convert(protobuf1);
|
RegionState state2 = RegionState.convert(protobuf1);
|
||||||
ClusterStatusProtos.RegionState protobuf2 = state1.convert();
|
ClusterStatusProtos.RegionState protobuf2 = state1.convert();
|
||||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.YouAreDeadException;
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
@ -45,7 +44,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.MasterWalManager;
|
import org.apache.hadoop.hbase.master.MasterWalManager;
|
||||||
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
|
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
|
||||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
||||||
|
@ -290,8 +288,7 @@ public class MockMasterServices extends MockNoopMasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateRegionLocation(RegionInfo regionInfo, State state, ServerName regionLocation,
|
public void updateRegionLocation(RegionStates.RegionStateNode regionNode) throws IOException {
|
||||||
ServerName lastHost, long openSeqNum, long pid) throws IOException {
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -349,7 +349,7 @@ public class TestAssignmentManager {
|
||||||
fail("unexpected assign completion");
|
fail("unexpected assign completion");
|
||||||
} catch (RetriesExhaustedException e) {
|
} catch (RetriesExhaustedException e) {
|
||||||
// expected exception
|
// expected exception
|
||||||
LOG.info("REGION STATE " + am.getRegionStates().getRegionNode(hri));
|
LOG.info("REGION STATE " + am.getRegionStates().getRegionStateNode(hri));
|
||||||
LOG.info("expected exception from assign operation: " + e.getMessage(), e);
|
LOG.info("expected exception from assign operation: " + e.getMessage(), e);
|
||||||
assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen());
|
assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen());
|
||||||
}
|
}
|
||||||
|
@ -780,8 +780,8 @@ public class TestAssignmentManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> operations) {
|
protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
|
||||||
submitTask(new MockRemoteCall(serverName, operations));
|
submitTask(new MockRemoteCall(serverName, remoteProcedures));
|
||||||
}
|
}
|
||||||
|
|
||||||
private class MockRemoteCall extends ExecuteProceduresRemoteCall {
|
private class MockRemoteCall extends ExecuteProceduresRemoteCall {
|
||||||
|
|
|
@ -144,7 +144,7 @@ public class TestRegionStates {
|
||||||
executorService.submit(new Callable<Object>() {
|
executorService.submit(new Callable<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object call() {
|
public Object call() {
|
||||||
return stateMap.getOrCreateRegionNode(RegionInfoBuilder.newBuilder(tableName)
|
return stateMap.getOrCreateRegionStateNode(RegionInfoBuilder.newBuilder(tableName)
|
||||||
.setStartKey(Bytes.toBytes(regionId))
|
.setStartKey(Bytes.toBytes(regionId))
|
||||||
.setEndKey(Bytes.toBytes(regionId + 1))
|
.setEndKey(Bytes.toBytes(regionId + 1))
|
||||||
.setSplit(false)
|
.setSplit(false)
|
||||||
|
@ -156,7 +156,7 @@ public class TestRegionStates {
|
||||||
|
|
||||||
private Object createRegionNode(final RegionStates stateMap,
|
private Object createRegionNode(final RegionStates stateMap,
|
||||||
final TableName tableName, final long regionId) {
|
final TableName tableName, final long regionId) {
|
||||||
return stateMap.getOrCreateRegionNode(createRegionInfo(tableName, regionId));
|
return stateMap.getOrCreateRegionStateNode(createRegionInfo(tableName, regionId));
|
||||||
}
|
}
|
||||||
|
|
||||||
private RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
|
private RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
|
||||||
|
@ -181,7 +181,7 @@ public class TestRegionStates {
|
||||||
@Override
|
@Override
|
||||||
public Object call() {
|
public Object call() {
|
||||||
RegionInfo hri = createRegionInfo(TABLE_NAME, regionId);
|
RegionInfo hri = createRegionInfo(TABLE_NAME, regionId);
|
||||||
return stateMap.getOrCreateRegionNode(hri);
|
return stateMap.getOrCreateRegionStateNode(hri);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -218,7 +218,7 @@ public class TestRegionStates {
|
||||||
final RegionStates stateMap = new RegionStates();
|
final RegionStates stateMap = new RegionStates();
|
||||||
long st = System.currentTimeMillis();
|
long st = System.currentTimeMillis();
|
||||||
for (int i = 0; i < NRUNS; ++i) {
|
for (int i = 0; i < NRUNS; ++i) {
|
||||||
stateMap.createRegionNode(createRegionInfo(TABLE_NAME, i));
|
stateMap.createRegionStateNode(createRegionInfo(TABLE_NAME, i));
|
||||||
}
|
}
|
||||||
long et = System.currentTimeMillis();
|
long et = System.currentTimeMillis();
|
||||||
LOG.info(String.format("PERF SingleThread: %s %s/sec",
|
LOG.info(String.format("PERF SingleThread: %s %s/sec",
|
||||||
|
|
|
@ -306,7 +306,7 @@ public class TestHRegionInfo {
|
||||||
Assert.assertArrayEquals(HRegionInfo.HIDDEN_START_KEY,
|
Assert.assertArrayEquals(HRegionInfo.HIDDEN_START_KEY,
|
||||||
HRegionInfo.getStartKeyForDisplay(h, conf));
|
HRegionInfo.getStartKeyForDisplay(h, conf));
|
||||||
|
|
||||||
RegionState state = new RegionState(h, RegionState.State.OPEN);
|
RegionState state = RegionState.createForTesting(h, RegionState.State.OPEN);
|
||||||
String descriptiveNameForDisplay =
|
String descriptiveNameForDisplay =
|
||||||
HRegionInfo.getDescriptiveNameFromRegionStateForDisplay(state, conf);
|
HRegionInfo.getDescriptiveNameFromRegionStateForDisplay(state, conf);
|
||||||
checkDescriptiveNameEquality(descriptiveNameForDisplay,state.toDescriptiveString(), startKey);
|
checkDescriptiveNameEquality(descriptiveNameForDisplay,state.toDescriptiveString(), startKey);
|
||||||
|
|
|
@ -515,7 +515,8 @@ public class MetaTableLocator {
|
||||||
state = RegionState.State.OFFLINE;
|
state = RegionState.State.OFFLINE;
|
||||||
}
|
}
|
||||||
return new RegionState(
|
return new RegionState(
|
||||||
RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId),
|
RegionReplicaUtil.getRegionInfoForReplica(
|
||||||
|
RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId),
|
||||||
state, serverName);
|
state, serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue