HBASE-19367 Refactoring in RegionStates, and RSProcedureDispatcher

- Adding javadoc comments
- Bug: ServerStateNode#regions is HashSet but there's no synchronization to prevent concurrent addRegion/removeRegion. Let's use concurrent set instead.
- Use getRegionsInTransitionCount() directly to avoid instead of getRegionsInTransition().size() because the latter copies everything into a new array - what a waste for just the size.
- There's mixed use of getRegionNode and getRegionStateNode for same return type - RegionStateNode. Changing everything to getRegionStateNode. Similarly rename other *RegionNode() fns to *RegionStateNode().
- RegionStateNode#transitionState() return value is useless since it always returns it's first param.
- Other minor improvements
This commit is contained in:
Apekshit Sharma 2017-11-27 16:17:39 -08:00
parent 5b7f9c2535
commit 81b95afbee
20 changed files with 195 additions and 191 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.master; package org.apache.hadoop.hbase.master;
import com.google.common.annotations.VisibleForTesting;
import java.util.Date; import java.util.Date;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -169,12 +170,12 @@ public class RegionState {
// The duration of region in transition // The duration of region in transition
private long ritDuration; private long ritDuration;
public RegionState(RegionInfo region, State state) { @VisibleForTesting
this(region, state, System.currentTimeMillis(), null); public static RegionState createForTesting(RegionInfo region, State state) {
return new RegionState(region, state, System.currentTimeMillis(), null);
} }
public RegionState(RegionInfo region, public RegionState(RegionInfo region, State state, ServerName serverName) {
State state, ServerName serverName) {
this(region, state, System.currentTimeMillis(), serverName); this(region, state, System.currentTimeMillis(), serverName);
} }

View File

@ -21,9 +21,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionInfoDisplay;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -66,7 +63,7 @@ public class TestRegionInfoDisplay {
Assert.assertArrayEquals(RegionInfoDisplay.HIDDEN_START_KEY, Assert.assertArrayEquals(RegionInfoDisplay.HIDDEN_START_KEY,
RegionInfoDisplay.getStartKeyForDisplay(ri, conf)); RegionInfoDisplay.getStartKeyForDisplay(ri, conf));
RegionState state = new RegionState(convert(ri), RegionState.State.OPEN); RegionState state = RegionState.createForTesting(convert(ri), RegionState.State.OPEN);
String descriptiveNameForDisplay = String descriptiveNameForDisplay =
RegionInfoDisplay.getDescriptiveNameFromRegionStateForDisplay(state, conf); RegionInfoDisplay.getDescriptiveNameFromRegionStateForDisplay(state, conf);
checkDescriptiveNameEquality(descriptiveNameForDisplay,state.toDescriptiveString(), startKey); checkDescriptiveNameEquality(descriptiveNameForDisplay,state.toDescriptiveString(), startKey);

View File

@ -242,9 +242,9 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
} }
protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
final TRemote remote, final Set<RemoteProcedure> operations) { final TRemote remote, final Set<RemoteProcedure> remoteProcedures) {
final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
for (RemoteProcedure proc: operations) { for (RemoteProcedure proc: remoteProcedures) {
RemoteOperation operation = proc.remoteCallBuild(env, remote); RemoteOperation operation = proc.remoteCallBuild(env, remote);
requestByType.put(operation.getClass(), operation); requestByType.put(operation.getClass(), operation);
} }

View File

@ -130,7 +130,7 @@ public class TestRSGroupsOfflineMode {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
return groupRS.getNumberOfOnlineRegions() < 1 && return groupRS.getNumberOfOnlineRegions() < 1 &&
master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() < 1; master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount() < 1;
} }
}); });
// Move table to group and wait. // Move table to group and wait.

View File

@ -1337,7 +1337,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// Throttling by max number regions in transition // Throttling by max number regions in transition
while (!interrupted while (!interrupted
&& maxRegionsInTransition > 0 && maxRegionsInTransition > 0
&& this.assignmentManager.getRegionStates().getRegionsInTransition().size() && this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
>= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) { >= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
try { try {
// sleep if the number of regions in transition exceeds the limit // sleep if the number of regions in transition exceeds the limit

View File

@ -352,7 +352,7 @@ public class AssignProcedure extends RegionTransitionProcedure {
@Override @Override
public ServerName getServer(final MasterProcedureEnv env) { public ServerName getServer(final MasterProcedureEnv env) {
RegionStateNode node = RegionStateNode node =
env.getAssignmentManager().getRegionStates().getRegionNode(this.getRegionInfo()); env.getAssignmentManager().getRegionStates().getRegionStateNode(this.getRegionInfo());
if (node == null) return null; if (node == null) return null;
return node.getRegionLocation(); return node.getRegionLocation();
} }

View File

@ -377,7 +377,7 @@ public class AssignmentManager implements ServerListener {
private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
// TODO: check for state? // TODO: check for state?
final RegionStateNode node = regionStates.getRegionNode(regionInfo); final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
return(node != null && serverName.equals(node.getRegionLocation())); return(node != null && serverName.equals(node.getRegionLocation()));
} }
@ -538,7 +538,7 @@ public class AssignmentManager implements ServerListener {
public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan) public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan)
throws IOException { throws IOException {
// TODO: rename this reassign // TODO: rename this reassign
RegionStateNode node = this.regionStates.getRegionNode(regionInfo); RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
ServerName destinationServer = node.getRegionLocation(); ServerName destinationServer = node.getRegionLocation();
if (destinationServer == null) { if (destinationServer == null) {
throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString()); throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString());
@ -549,7 +549,7 @@ public class AssignmentManager implements ServerListener {
} }
public void move(final RegionInfo regionInfo) throws IOException { public void move(final RegionInfo regionInfo) throws IOException {
RegionStateNode node = this.regionStates.getRegionNode(regionInfo); RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
ServerName sourceServer = node.getRegionLocation(); ServerName sourceServer = node.getRegionLocation();
RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null); RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null);
MoveRegionProcedure proc = createMoveRegionProcedure(plan); MoveRegionProcedure proc = createMoveRegionProcedure(plan);
@ -576,7 +576,7 @@ public class AssignmentManager implements ServerListener {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
// Something badly wrong if takes ten seconds to register a region. // Something badly wrong if takes ten seconds to register a region.
long endTime = startTime + 10000; long endTime = startTime + 10000;
while ((node = regionStates.getRegionNode(regionInfo)) == null && isRunning() && while ((node = regionStates.getRegionStateNode(regionInfo)) == null && isRunning() &&
System.currentTimeMillis() < endTime) { System.currentTimeMillis() < endTime) {
// Presume it not yet added but will be added soon. Let it spew a lot so we can tell if // Presume it not yet added but will be added soon. Let it spew a lot so we can tell if
// we are waiting here alot. // we are waiting here alot.
@ -796,7 +796,7 @@ public class AssignmentManager implements ServerListener {
throws PleaseHoldException, UnexpectedStateException { throws PleaseHoldException, UnexpectedStateException {
checkFailoverCleanupCompleted(regionInfo); checkFailoverCleanupCompleted(regionInfo);
final RegionStateNode regionNode = regionStates.getRegionNode(regionInfo); final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
if (regionNode == null) { if (regionNode == null) {
// the table/region is gone. maybe a delete, split, merge // the table/region is gone. maybe a delete, split, merge
throw new UnexpectedStateException(String.format( throw new UnexpectedStateException(String.format(
@ -947,7 +947,7 @@ public class AssignmentManager implements ServerListener {
continue; continue;
} }
final RegionStateNode regionNode = regionStates.getOrCreateRegionNode(hri); final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
LOG.info("META REPORTED: " + regionNode); LOG.info("META REPORTED: " + regionNode);
if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) { if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
LOG.warn("META REPORTED but no procedure found (complete?)"); LOG.warn("META REPORTED but no procedure found (complete?)");
@ -969,7 +969,7 @@ public class AssignmentManager implements ServerListener {
try { try {
for (byte[] regionName: regionNames) { for (byte[] regionName: regionNames) {
if (!isRunning()) return; if (!isRunning()) return;
final RegionStateNode regionNode = regionStates.getRegionNodeFromName(regionName); final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
if (regionNode == null) { if (regionNode == null) {
throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName)); throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
} }
@ -1142,7 +1142,7 @@ public class AssignmentManager implements ServerListener {
} }
private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
final RegionStateNode regionNode = regionStates.getRegionNode(regionInfo); final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
//if (regionNode.isStuck()) { //if (regionNode.isStuck()) {
LOG.warn("TODO Handle stuck in transition: " + regionNode); LOG.warn("TODO Handle stuck in transition: " + regionNode);
} }
@ -1181,7 +1181,7 @@ public class AssignmentManager implements ServerListener {
@Override @Override
public void visitRegionState(final RegionInfo regionInfo, final State state, public void visitRegionState(final RegionInfo regionInfo, final State state,
final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
final RegionStateNode regionNode = regionStates.getOrCreateRegionNode(regionInfo); final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
State localState = state; State localState = state;
if (localState == null) { if (localState == null) {
// No region state column data in hbase:meta table! Are I doing a rolling upgrade from // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
@ -1200,7 +1200,7 @@ public class AssignmentManager implements ServerListener {
if (localState == State.OPEN) { if (localState == State.OPEN) {
assert regionLocation != null : "found null region location for " + regionNode; assert regionLocation != null : "found null region location for " + regionNode;
regionStates.addRegionToServer(regionLocation, regionNode); regionStates.addRegionToServer(regionNode);
} else if (localState == State.OFFLINE || regionInfo.isOffline()) { } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
regionStates.addToOfflineRegions(regionNode); regionStates.addToOfflineRegions(regionNode);
} else { } else {
@ -1227,7 +1227,7 @@ public class AssignmentManager implements ServerListener {
long st, et; long st, et;
st = System.currentTimeMillis(); st = System.currentTimeMillis();
for (RegionStateNode regionNode: regionStates.getRegionNodes()) { for (RegionStateNode regionNode: regionStates.getRegionStateNodes()) {
if (regionNode.getState() == State.OPEN) { if (regionNode.getState() == State.OPEN) {
final ServerName serverName = regionNode.getRegionLocation(); final ServerName serverName = regionNode.getRegionLocation();
if (!master.getServerManager().isServerOnline(serverName)) { if (!master.getServerManager().isServerOnline(serverName)) {
@ -1331,7 +1331,7 @@ public class AssignmentManager implements ServerListener {
public void offlineRegion(final RegionInfo regionInfo) { public void offlineRegion(final RegionInfo regionInfo) {
// TODO used by MasterRpcServices ServerCrashProcedure // TODO used by MasterRpcServices ServerCrashProcedure
final RegionStateNode node = regionStates.getRegionNode(regionInfo); final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
if (node != null) node.offline(); if (node != null) node.offline();
} }
@ -1412,7 +1412,7 @@ public class AssignmentManager implements ServerListener {
} }
public RegionInfo getRegionInfo(final byte[] regionName) { public RegionInfo getRegionInfo(final byte[] regionName) {
final RegionStateNode regionState = regionStates.getRegionNodeFromName(regionName); final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
return regionState != null ? regionState.getRegionInfo() : null; return regionState != null ? regionState.getRegionInfo() : null;
} }
@ -1440,11 +1440,9 @@ public class AssignmentManager implements ServerListener {
public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException { public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException {
synchronized (regionNode) { synchronized (regionNode) {
State state = regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN); regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); regionStates.addRegionToServer(regionNode);
regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, regionStateStore.updateRegionLocation(regionNode);
regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM,
regionNode.getProcedure().getProcId());
} }
// update the operation count metrics // update the operation count metrics
@ -1468,18 +1466,16 @@ public class AssignmentManager implements ServerListener {
public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException { public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
final RegionInfo hri = regionNode.getRegionInfo(); final RegionInfo hri = regionNode.getRegionInfo();
synchronized (regionNode) { synchronized (regionNode) {
State state = regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN); regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
if (isMetaRegion(hri)) { if (isMetaRegion(hri)) {
master.getTableStateManager().setTableState(TableName.META_TABLE_NAME, master.getTableStateManager().setTableState(TableName.META_TABLE_NAME,
TableState.State.ENABLED); TableState.State.ENABLED);
setMetaInitialized(hri, true); setMetaInitialized(hri, true);
} }
regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); regionStates.addRegionToServer(regionNode);
// TODO: OPENING Updates hbase:meta too... we need to do both here and there? // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
// That is a lot of hbase:meta writing. // That is a lot of hbase:meta writing.
regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, regionStateStore.updateRegionLocation(regionNode);
regionNode.getRegionLocation(), regionNode.getLastHost(), regionNode.getOpenSeqNum(),
regionNode.getProcedure().getProcId());
sendRegionOpenedNotification(hri, regionNode.getRegionLocation()); sendRegionOpenedNotification(hri, regionNode.getRegionLocation());
} }
} }
@ -1487,15 +1483,13 @@ public class AssignmentManager implements ServerListener {
public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException { public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException {
final RegionInfo hri = regionNode.getRegionInfo(); final RegionInfo hri = regionNode.getRegionInfo();
synchronized (regionNode) { synchronized (regionNode) {
State state = regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE); regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
// Set meta has not initialized early. so people trying to create/edit tables will wait // Set meta has not initialized early. so people trying to create/edit tables will wait
if (isMetaRegion(hri)) { if (isMetaRegion(hri)) {
setMetaInitialized(hri, false); setMetaInitialized(hri, false);
} }
regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); regionStates.addRegionToServer(regionNode);
regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, regionStateStore.updateRegionLocation(regionNode);
regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM,
regionNode.getProcedure().getProcId());
} }
// update the operation count metrics // update the operation count metrics
@ -1510,13 +1504,11 @@ public class AssignmentManager implements ServerListener {
public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException { public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
final RegionInfo hri = regionNode.getRegionInfo(); final RegionInfo hri = regionNode.getRegionInfo();
synchronized (regionNode) { synchronized (regionNode) {
State state = regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE); regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE);
regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode); regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
regionNode.setLastHost(regionNode.getRegionLocation()); regionNode.setLastHost(regionNode.getRegionLocation());
regionNode.setRegionLocation(null); regionNode.setRegionLocation(null);
regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, regionStateStore.updateRegionLocation(regionNode);
regionNode.getRegionLocation()/*null*/, regionNode.getLastHost(),
HConstants.NO_SEQNUM, regionNode.getProcedure().getProcId());
sendRegionClosedNotification(hri); sendRegionClosedNotification(hri);
} }
} }
@ -1529,11 +1521,11 @@ public class AssignmentManager implements ServerListener {
// Update its state in regionStates to it shows as offline and split when read // Update its state in regionStates to it shows as offline and split when read
// later figuring what regions are in a table and what are not: see // later figuring what regions are in a table and what are not: see
// regionStates#getRegionsOfTable // regionStates#getRegionsOfTable
final RegionStateNode node = regionStates.getOrCreateRegionNode(parent); final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
node.setState(State.SPLIT); node.setState(State.SPLIT);
final RegionStateNode nodeA = regionStates.getOrCreateRegionNode(daughterA); final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
nodeA.setState(State.SPLITTING_NEW); nodeA.setState(State.SPLITTING_NEW);
final RegionStateNode nodeB = regionStates.getOrCreateRegionNode(daughterB); final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
nodeB.setState(State.SPLITTING_NEW); nodeB.setState(State.SPLITTING_NEW);
regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
@ -1554,7 +1546,7 @@ public class AssignmentManager implements ServerListener {
*/ */
public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
final RegionInfo mother, final RegionInfo father) throws IOException { final RegionInfo mother, final RegionInfo father) throws IOException {
final RegionStateNode node = regionStates.getOrCreateRegionNode(child); final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
node.setState(State.MERGED); node.setState(State.MERGED);
regionStates.deleteRegion(mother); regionStates.deleteRegion(mother);
regionStates.deleteRegion(father); regionStates.deleteRegion(father);

View File

@ -556,8 +556,8 @@ public class MergeTableRegionsProcedure
public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException { public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
// Set State.MERGING to regions to be merged // Set State.MERGING to regions to be merged
RegionStates regionStates = env.getAssignmentManager().getRegionStates(); RegionStates regionStates = env.getAssignmentManager().getRegionStates();
regionStates.getRegionNode(regionsToMerge[0]).setState(State.MERGING); regionStates.getRegionStateNode(regionsToMerge[0]).setState(State.MERGING);
regionStates.getRegionNode(regionsToMerge[1]).setState(State.MERGING); regionStates.getRegionStateNode(regionsToMerge[1]).setState(State.MERGING);
} }
/** /**
@ -569,8 +569,8 @@ public class MergeTableRegionsProcedure
private void setRegionStateBackToOpen(final MasterProcedureEnv env) throws IOException { private void setRegionStateBackToOpen(final MasterProcedureEnv env) throws IOException {
// revert region state to Open // revert region state to Open
RegionStates regionStates = env.getAssignmentManager().getRegionStates(); RegionStates regionStates = env.getAssignmentManager().getRegionStates();
regionStates.getRegionNode(regionsToMerge[0]).setState(State.OPEN); regionStates.getRegionStateNode(regionsToMerge[0]).setState(State.OPEN);
regionStates.getRegionNode(regionsToMerge[1]).setState(State.OPEN); regionStates.getRegionStateNode(regionsToMerge[1]).setState(State.OPEN);
} }
/** /**
@ -595,7 +595,7 @@ public class MergeTableRegionsProcedure
//Prepare to create merged regions //Prepare to create merged regions
env.getAssignmentManager().getRegionStates(). env.getAssignmentManager().getRegionStates().
getOrCreateRegionNode(mergedRegion).setState(State.MERGING_NEW); getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
} }
/** /**

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -138,23 +137,19 @@ public class RegionStateStore {
} }
} }
public void updateRegionLocation(final RegionInfo regionInfo, final State state, public void updateRegionLocation(RegionStates.RegionStateNode regionStateNode)
final ServerName regionLocation, final ServerName lastHost, final long openSeqNum,
final long pid)
throws IOException { throws IOException {
if (regionInfo.isMetaRegion()) { if (regionStateNode.getRegionInfo().isMetaRegion()) {
updateMetaLocation(regionInfo, regionLocation); updateMetaLocation(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
} else { } else {
updateUserRegionLocation(regionInfo, state, regionLocation, lastHost, openSeqNum, pid); long openSeqNum = regionStateNode.getState() == State.OPEN ?
regionStateNode.getOpenSeqNum() : HConstants.NO_SEQNUM;
updateUserRegionLocation(regionStateNode.getRegionInfo(), regionStateNode.getState(),
regionStateNode.getRegionLocation(), regionStateNode.getLastHost(), openSeqNum,
regionStateNode.getProcedure().getProcId());
} }
} }
public void updateRegionState(final long openSeqNum, final long pid,
final RegionState newState, final RegionState oldState) throws IOException {
updateRegionLocation(newState.getRegion(), newState.getState(), newState.getServerName(),
oldState != null ? oldState.getServerName() : null, openSeqNum, pid);
}
protected void updateMetaLocation(final RegionInfo regionInfo, final ServerName serverName) protected void updateMetaLocation(final RegionInfo regionInfo, final ServerName serverName)
throws IOException { throws IOException {
try { try {

View File

@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -123,13 +122,18 @@ public class RegionStates {
this.event = new AssignmentProcedureEvent(regionInfo); this.event = new AssignmentProcedureEvent(regionInfo);
} }
/**
* @param update new region state this node should be assigned.
* @param expected current state should be in this given list of expected states
* @return true, if current state is in expected list; otherwise false.
*/
public boolean setState(final State update, final State... expected) { public boolean setState(final State update, final State... expected) {
final boolean expectedState = isInState(expected); if (!isInState(expected)) {
if (expectedState) { return false;
this.state = update;
this.lastUpdate = EnvironmentEdgeManager.currentTime();
} }
return expectedState; this.state = update;
this.lastUpdate = EnvironmentEdgeManager.currentTime();
return true;
} }
/** /**
@ -145,13 +149,12 @@ public class RegionStates {
* Set new {@link State} but only if currently in <code>expected</code> State * Set new {@link State} but only if currently in <code>expected</code> State
* (if not, throw {@link UnexpectedStateException}. * (if not, throw {@link UnexpectedStateException}.
*/ */
public State transitionState(final State update, final State... expected) public void transitionState(final State update, final State... expected)
throws UnexpectedStateException { throws UnexpectedStateException {
if (!setState(update, expected)) { if (!setState(update, expected)) {
throw new UnexpectedStateException("Expected " + Arrays.toString(expected) + throw new UnexpectedStateException("Expected " + Arrays.toString(expected) +
" so could move to " + update + " but current state=" + getState()); " so could move to " + update + " but current state=" + getState());
} }
return update;
} }
public boolean isInState(final State... expected) { public boolean isInState(final State... expected) {
@ -253,6 +256,10 @@ public class RegionStates {
return 0; return 0;
} }
public RegionState toRegionState() {
return new RegionState(getRegionInfo(), getState(), getLastUpdate(), getRegionLocation());
}
@Override @Override
public int compareTo(final RegionStateNode other) { public int compareTo(final RegionStateNode other) {
// NOTE: RegionInfo sort by table first, so we are relying on that. // NOTE: RegionInfo sort by table first, so we are relying on that.
@ -311,7 +318,7 @@ public class RegionStates {
public ServerStateNode(final ServerName serverName) { public ServerStateNode(final ServerName serverName) {
this.serverName = serverName; this.serverName = serverName;
this.regions = new HashSet<RegionStateNode>(); this.regions = ConcurrentHashMap.newKeySet();
this.reportEvent = new ServerReportEvent(serverName); this.reportEvent = new ServerReportEvent(serverName);
} }
@ -440,33 +447,23 @@ public class RegionStates {
// ========================================================================== // ==========================================================================
// RegionStateNode helpers // RegionStateNode helpers
// ========================================================================== // ==========================================================================
protected RegionStateNode createRegionNode(final RegionInfo regionInfo) { protected RegionStateNode createRegionStateNode(final RegionInfo regionInfo) {
RegionStateNode newNode = new RegionStateNode(regionInfo); RegionStateNode newNode = new RegionStateNode(regionInfo);
RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode); RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
return oldNode != null ? oldNode : newNode; return oldNode != null ? oldNode : newNode;
} }
protected RegionStateNode getOrCreateRegionNode(final RegionInfo regionInfo) { protected RegionStateNode getOrCreateRegionStateNode(final RegionInfo regionInfo) {
RegionStateNode node = regionsMap.get(regionInfo.getRegionName()); RegionStateNode node = regionsMap.get(regionInfo.getRegionName());
return node != null ? node : createRegionNode(regionInfo); return node != null ? node : createRegionStateNode(regionInfo);
} }
RegionStateNode getRegionNodeFromName(final byte[] regionName) { RegionStateNode getRegionStateNodeFromName(final byte[] regionName) {
return regionsMap.get(regionName); return regionsMap.get(regionName);
} }
protected RegionStateNode getRegionNode(final RegionInfo regionInfo) { protected RegionStateNode getRegionStateNode(final RegionInfo regionInfo) {
return getRegionNodeFromName(regionInfo.getRegionName()); return getRegionStateNodeFromName(regionInfo.getRegionName());
}
RegionStateNode getRegionNodeFromEncodedName(final String encodedRegionName) {
// TODO: Need a map <encodedName, ...> but it is just dispatch merge...
for (RegionStateNode node: regionsMap.values()) {
if (node.getRegionInfo().getEncodedName().equals(encodedRegionName)) {
return node;
}
}
return null;
} }
public void deleteRegion(final RegionInfo regionInfo) { public void deleteRegion(final RegionInfo regionInfo) {
@ -491,7 +488,7 @@ public class RegionStates {
final ArrayList<RegionState> regions = new ArrayList<RegionState>(); final ArrayList<RegionState> regions = new ArrayList<RegionState>();
for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) { for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) {
if (!node.getTable().equals(tableName)) break; if (!node.getTable().equals(tableName)) break;
regions.add(createRegionState(node)); regions.add(node.toRegionState());
} }
return regions; return regions;
} }
@ -505,14 +502,14 @@ public class RegionStates {
return regions; return regions;
} }
Collection<RegionStateNode> getRegionNodes() { Collection<RegionStateNode> getRegionStateNodes() {
return regionsMap.values(); return regionsMap.values();
} }
public ArrayList<RegionState> getRegionStates() { public ArrayList<RegionState> getRegionStates() {
final ArrayList<RegionState> regions = new ArrayList<RegionState>(regionsMap.size()); final ArrayList<RegionState> regions = new ArrayList<RegionState>(regionsMap.size());
for (RegionStateNode node: regionsMap.values()) { for (RegionStateNode node: regionsMap.values()) {
regions.add(createRegionState(node)); regions.add(node.toRegionState());
} }
return regions; return regions;
} }
@ -521,17 +518,18 @@ public class RegionStates {
// RegionState helpers // RegionState helpers
// ========================================================================== // ==========================================================================
public RegionState getRegionState(final RegionInfo regionInfo) { public RegionState getRegionState(final RegionInfo regionInfo) {
return createRegionState(getRegionNode(regionInfo)); RegionStateNode regionStateNode = getRegionStateNode(regionInfo);
return regionStateNode == null ? null : regionStateNode.toRegionState();
} }
public RegionState getRegionState(final String encodedRegionName) { public RegionState getRegionState(final String encodedRegionName) {
return createRegionState(getRegionNodeFromEncodedName(encodedRegionName)); // TODO: Need a map <encodedName, ...> but it is just dispatch merge...
} for (RegionStateNode node: regionsMap.values()) {
if (node.getRegionInfo().getEncodedName().equals(encodedRegionName)) {
private RegionState createRegionState(final RegionStateNode node) { return node.toRegionState();
return node == null ? null : }
new RegionState(node.getRegionInfo(), node.getState(), }
node.getLastUpdate(), node.getRegionLocation()); return null;
} }
// ============================================================================================ // ============================================================================================
@ -612,7 +610,7 @@ public class RegionStates {
} }
public void logSplit(final RegionInfo regionInfo) { public void logSplit(final RegionInfo regionInfo) {
final RegionStateNode regionNode = getRegionNode(regionInfo); final RegionStateNode regionNode = getRegionStateNode(regionInfo);
synchronized (regionNode) { synchronized (regionNode) {
regionNode.setState(State.SPLIT); regionNode.setState(State.SPLIT);
} }
@ -620,7 +618,7 @@ public class RegionStates {
@VisibleForTesting @VisibleForTesting
public void updateRegionState(final RegionInfo regionInfo, final State state) { public void updateRegionState(final RegionInfo regionInfo, final State state) {
final RegionStateNode regionNode = getOrCreateRegionNode(regionInfo); final RegionStateNode regionNode = getOrCreateRegionStateNode(regionInfo);
synchronized (regionNode) { synchronized (regionNode) {
regionNode.setState(state); regionNode.setState(state);
} }
@ -640,7 +638,7 @@ public class RegionStates {
} }
public boolean isRegionInState(final RegionInfo regionInfo, final State... state) { public boolean isRegionInState(final RegionInfo regionInfo, final State... state) {
final RegionStateNode region = getRegionNode(regionInfo); final RegionStateNode region = getRegionStateNode(regionInfo);
if (region != null) { if (region != null) {
synchronized (region) { synchronized (region) {
return region.isInState(state); return region.isInState(state);
@ -664,7 +662,7 @@ public class RegionStates {
final Collection<RegionInfo> regions) { final Collection<RegionInfo> regions) {
final Map<ServerName, List<RegionInfo>> result = new HashMap<ServerName, List<RegionInfo>>(); final Map<ServerName, List<RegionInfo>> result = new HashMap<ServerName, List<RegionInfo>>();
for (RegionInfo hri: regions) { for (RegionInfo hri: regions) {
final RegionStateNode node = getRegionNode(hri); final RegionStateNode node = getRegionStateNode(hri);
if (node == null) continue; if (node == null) continue;
// TODO: State.OPEN // TODO: State.OPEN
@ -707,7 +705,7 @@ public class RegionStates {
} }
public ServerName getRegionServerOfRegion(final RegionInfo regionInfo) { public ServerName getRegionServerOfRegion(final RegionInfo regionInfo) {
final RegionStateNode region = getRegionNode(regionInfo); final RegionStateNode region = getRegionStateNode(regionInfo);
if (region != null) { if (region != null) {
synchronized (region) { synchronized (region) {
ServerName server = region.getRegionLocation(); ServerName server = region.getRegionLocation();
@ -815,7 +813,7 @@ public class RegionStates {
if (node == null) return null; if (node == null) return null;
synchronized (node) { synchronized (node) {
return node.isInTransition() ? createRegionState(node) : null; return node.isInTransition() ? node.toRegionState() : null;
} }
} }
@ -833,7 +831,7 @@ public class RegionStates {
public List<RegionState> getRegionsStateInTransition() { public List<RegionState> getRegionsStateInTransition() {
final List<RegionState> rit = new ArrayList<RegionState>(regionInTransition.size()); final List<RegionState> rit = new ArrayList<RegionState>(regionInTransition.size());
for (RegionStateNode node: regionInTransition.values()) { for (RegionStateNode node: regionInTransition.values()) {
rit.add(createRegionState(node)); rit.add(node.toRegionState());
} }
return rit; return rit;
} }
@ -841,7 +839,7 @@ public class RegionStates {
public SortedSet<RegionState> getRegionsInTransitionOrderedByTimestamp() { public SortedSet<RegionState> getRegionsInTransitionOrderedByTimestamp() {
final SortedSet<RegionState> rit = new TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR); final SortedSet<RegionState> rit = new TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR);
for (RegionStateNode node: regionInTransition.values()) { for (RegionStateNode node: regionInTransition.values()) {
rit.add(createRegionState(node)); rit.add(node.toRegionState());
} }
return rit; return rit;
} }
@ -873,7 +871,7 @@ public class RegionStates {
this.regionNode = regionNode; this.regionNode = regionNode;
} }
public RegionStateNode getRegionNode() { public RegionStateNode getRegionStateNode() {
return regionNode; return regionNode;
} }
@ -922,7 +920,7 @@ public class RegionStates {
ArrayList<RegionState> regions = new ArrayList<RegionState>(regionFailedOpen.size()); ArrayList<RegionState> regions = new ArrayList<RegionState>(regionFailedOpen.size());
for (RegionFailedOpen r: regionFailedOpen.values()) { for (RegionFailedOpen r: regionFailedOpen.values()) {
regions.add(createRegionState(r.getRegionNode())); regions.add(r.getRegionStateNode().toRegionState());
} }
return regions; return regions;
} }
@ -958,9 +956,8 @@ public class RegionStates {
return numServers == 0 ? 0.0: (double)totalLoad / (double)numServers; return numServers == 0 ? 0.0: (double)totalLoad / (double)numServers;
} }
public ServerStateNode addRegionToServer(final ServerName serverName, public ServerStateNode addRegionToServer(final RegionStateNode regionNode) {
final RegionStateNode regionNode) { ServerStateNode serverNode = getOrCreateServer(regionNode.getRegionLocation());
ServerStateNode serverNode = getOrCreateServer(serverName);
serverNode.addRegion(regionNode); serverNode.addRegion(regionNode);
return serverNode; return serverNode;
} }

View File

@ -42,19 +42,21 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
/** /**
* Base class for the Assign and Unassign Procedure. * Base class for the Assign and Unassign Procedure.
* There can only be one RegionTransitionProcedure per region running at a time *
* since each procedure takes a lock on the region (see MasterProcedureScheduler). * Locking:
* Takes exclusive lock on the region being assigned/unassigned. Thus, there can only be one
* RegionTransitionProcedure per region running at a time (see MasterProcedureScheduler).
* *
* <p>This procedure is asynchronous and responds to external events. * <p>This procedure is asynchronous and responds to external events.
* The AssignmentManager will notify this procedure when the RS completes * The AssignmentManager will notify this procedure when the RS completes
* the operation and reports the transitioned state * the operation and reports the transitioned state
* (see the Assign and Unassign class for more detail). * (see the Assign and Unassign class for more detail).</p>
* *
* <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are * <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
* first submitted, to the REGION_TRANSITION_DISPATCH state when the request * first submitted, to the REGION_TRANSITION_DISPATCH state when the request
* to remote server is sent and the Procedure is suspended waiting on external * to remote server is sent and the Procedure is suspended waiting on external
* event to be woken again. Once the external event is triggered, Procedure * event to be woken again. Once the external event is triggered, Procedure
* moves to the REGION_TRANSITION_FINISH state. * moves to the REGION_TRANSITION_FINISH state.</p>
* *
* <p>NOTE: {@link AssignProcedure} and {@link UnassignProcedure} should not be thought of * <p>NOTE: {@link AssignProcedure} and {@link UnassignProcedure} should not be thought of
* as being asymmetric, at least currently. * as being asymmetric, at least currently.
@ -67,12 +69,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* AssignProcedure#handleFailure(MasterProcedureEnv, RegionStateNode) re-attempts the * AssignProcedure#handleFailure(MasterProcedureEnv, RegionStateNode) re-attempts the
* assignment by setting the procedure state to REGION_TRANSITION_QUEUE and forces * assignment by setting the procedure state to REGION_TRANSITION_QUEUE and forces
* assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When * assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When
* the number of attempts reach hreshold configuration 'hbase.assignment.maximum.attempts', * the number of attempts reaches threshold configuration 'hbase.assignment.maximum.attempts',
* the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are * the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are
* intentionally not implemented. It is a 'one shot' procedure. See its class doc for how it * intentionally not implemented. It is a 'one shot' procedure. See its class doc for how it
* handles failure. * handles failure.
* </li> * </li>
* </ul> * </ul>
* </p>
* *
* <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as possible, * <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as possible,
* re-attempting with any target makes sense if specified target fails in case of * re-attempting with any target makes sense if specified target fails in case of
@ -88,21 +91,18 @@ public abstract class RegionTransitionProcedure
protected final AtomicBoolean aborted = new AtomicBoolean(false); protected final AtomicBoolean aborted = new AtomicBoolean(false);
private RegionTransitionState transitionState = private RegionTransitionState transitionState = RegionTransitionState.REGION_TRANSITION_QUEUE;
RegionTransitionState.REGION_TRANSITION_QUEUE;
private RegionInfo regionInfo; private RegionInfo regionInfo;
private volatile boolean lock = false; private volatile boolean lock = false;
public RegionTransitionProcedure() { // Required by the Procedure framework to create the procedure on replay
// Required by the Procedure framework to create the procedure on replay public RegionTransitionProcedure() {}
super();
}
public RegionTransitionProcedure(final RegionInfo regionInfo) { public RegionTransitionProcedure(final RegionInfo regionInfo) {
this.regionInfo = regionInfo; this.regionInfo = regionInfo;
} }
public RegionInfo getRegionInfo() { protected RegionInfo getRegionInfo() {
return regionInfo; return regionInfo;
} }
@ -131,15 +131,14 @@ public abstract class RegionTransitionProcedure
} }
public RegionStateNode getRegionState(final MasterProcedureEnv env) { public RegionStateNode getRegionState(final MasterProcedureEnv env) {
return env.getAssignmentManager().getRegionStates(). return env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegionInfo());
getOrCreateRegionNode(getRegionInfo());
} }
protected void setTransitionState(final RegionTransitionState state) { void setTransitionState(final RegionTransitionState state) {
this.transitionState = state; this.transitionState = state;
} }
protected RegionTransitionState getTransitionState() { RegionTransitionState getTransitionState() {
return transitionState; return transitionState;
} }
@ -200,7 +199,7 @@ public abstract class RegionTransitionProcedure
* and this procedure has been set into a suspended state OR, we failed and * and this procedure has been set into a suspended state OR, we failed and
* this procedure has been put back on the scheduler ready for another worker * this procedure has been put back on the scheduler ready for another worker
* to pick it up. In both cases, we need to exit the current Worker processing * to pick it up. In both cases, we need to exit the current Worker processing
* toute de suite! * immediately!
* @return True if we successfully dispatched the call and false if we failed; * @return True if we successfully dispatched the call and false if we failed;
* if failed, we need to roll back any setup done for the dispatch. * if failed, we need to roll back any setup done for the dispatch.
*/ */
@ -217,7 +216,7 @@ public abstract class RegionTransitionProcedure
getRegionState(env).getProcedureEvent().suspend(); getRegionState(env).getProcedureEvent().suspend();
// Tricky because the below call to addOperationToNode can fail. If it fails, we need to // Tricky because the below call to addOperationToNode can fail. If it fails, we need to
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requeues us -- and // backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requests us -- and
// ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does // ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
// wake to undo the above suspend. // wake to undo the above suspend.
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) { if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {

View File

@ -133,7 +133,7 @@ public class SplitTableRegionProcedure
throw new IllegalArgumentException ("Can't invoke split on non-default regions directly"); throw new IllegalArgumentException ("Can't invoke split on non-default regions directly");
} }
RegionStateNode node = RegionStateNode node =
env.getAssignmentManager().getRegionStates().getRegionNode(getParentRegion()); env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion());
IOException splittableCheckIOE = null; IOException splittableCheckIOE = null;
boolean splittable = false; boolean splittable = false;
if (node != null) { if (node != null) {
@ -407,7 +407,7 @@ public class SplitTableRegionProcedure
public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException { public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException {
// Check whether the region is splittable // Check whether the region is splittable
RegionStateNode node = RegionStateNode node =
env.getAssignmentManager().getRegionStates().getRegionNode(getParentRegion()); env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion());
if (node == null) { if (node == null) {
throw new UnknownRegionException(getParentRegion().getRegionNameAsString()); throw new UnknownRegionException(getParentRegion().getRegionNameAsString());

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ArrayListMultimap; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@ -64,7 +66,8 @@ public class RSProcedureDispatcher
private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0 private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0
protected final MasterServices master; protected final MasterServices master;
protected final long rsStartupWaitTime; private final long rsStartupWaitTime;
private MasterProcedureEnv procedureEnv;
public RSProcedureDispatcher(final MasterServices master) { public RSProcedureDispatcher(final MasterServices master) {
super(master.getConfiguration()); super(master.getConfiguration());
@ -81,6 +84,7 @@ public class RSProcedureDispatcher
} }
master.getServerManager().registerListener(this); master.getServerManager().registerListener(this);
procedureEnv = master.getMasterProcedureExecutor().getEnvironment();
for (ServerName serverName: master.getServerManager().getOnlineServersList()) { for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
addNode(serverName); addNode(serverName);
} }
@ -99,18 +103,18 @@ public class RSProcedureDispatcher
@Override @Override
protected void remoteDispatch(final ServerName serverName, protected void remoteDispatch(final ServerName serverName,
final Set<RemoteProcedure> operations) { final Set<RemoteProcedure> remoteProcedures) {
final int rsVersion = master.getAssignmentManager().getServerVersion(serverName); final int rsVersion = master.getAssignmentManager().getServerVersion(serverName);
if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) { if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
LOG.info(String.format( LOG.info(String.format(
"Using procedure batch rpc execution for serverName=%s version=%s", "Using procedure batch rpc execution for serverName=%s version=%s",
serverName, rsVersion)); serverName, rsVersion));
submitTask(new ExecuteProceduresRemoteCall(serverName, operations)); submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
} else { } else {
LOG.info(String.format( LOG.info(String.format(
"Fallback to compat rpc execution for serverName=%s version=%s", "Fallback to compat rpc execution for serverName=%s version=%s",
serverName, rsVersion)); serverName, rsVersion));
submitTask(new CompatRemoteProcedureResolver(serverName, operations)); submitTask(new CompatRemoteProcedureResolver(serverName, remoteProcedures));
} }
} }
@ -118,9 +122,8 @@ public class RSProcedureDispatcher
final Set<RemoteProcedure> operations) { final Set<RemoteProcedure> operations) {
// TODO: Replace with a ServerNotOnlineException() // TODO: Replace with a ServerNotOnlineException()
final IOException e = new DoNotRetryIOException("server not online " + serverName); final IOException e = new DoNotRetryIOException("server not online " + serverName);
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
for (RemoteProcedure proc: operations) { for (RemoteProcedure proc: operations) {
proc.remoteCallFailed(env, serverName, e); proc.remoteCallFailed(procedureEnv, serverName, e);
} }
} }
@ -136,6 +139,8 @@ public class RSProcedureDispatcher
* Base remote call * Base remote call
*/ */
protected abstract class AbstractRSRemoteCall implements Callable<Void> { protected abstract class AbstractRSRemoteCall implements Callable<Void> {
public abstract Void call();
private final ServerName serverName; private final ServerName serverName;
private int numberOfAttemptsSoFar = 0; private int numberOfAttemptsSoFar = 0;
@ -145,8 +150,6 @@ public class RSProcedureDispatcher
this.serverName = serverName; this.serverName = serverName;
} }
public abstract Void call();
protected AdminService.BlockingInterface getRsAdmin() throws IOException { protected AdminService.BlockingInterface getRsAdmin() throws IOException {
final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
if (admin == null) { if (admin == null) {
@ -223,17 +226,29 @@ public class RSProcedureDispatcher
void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations); void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
} }
/**
* Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
* from the given {@code remoteProcedures} and groups them by class of the returned operation.
* Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
* {@link RegionCloseOperation}s.
* @param serverName RegionServer to which the remote operations are sent
* @param remoteProcedures Remote procedures which are dispatched to the given server
* @param resolver Used to dispatch remote procedures to given server.
*/
public void splitAndResolveOperation(final ServerName serverName, public void splitAndResolveOperation(final ServerName serverName,
final Set<RemoteProcedure> operations, final RemoteProcedureResolver resolver) { final Set<RemoteProcedure> remoteProcedures, final RemoteProcedureResolver resolver) {
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType = final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
buildAndGroupRequestByType(env, serverName, operations); buildAndGroupRequestByType(procedureEnv, serverName, remoteProcedures);
final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class); final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
if (!openOps.isEmpty()) resolver.dispatchOpenRequests(env, openOps); if (!openOps.isEmpty()) {
resolver.dispatchOpenRequests(procedureEnv, openOps);
}
final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class); final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
if (!closeOps.isEmpty()) resolver.dispatchCloseRequests(env, closeOps); if (!closeOps.isEmpty()) {
resolver.dispatchCloseRequests(procedureEnv, closeOps);
}
if (!reqsByType.isEmpty()) { if (!reqsByType.isEmpty()) {
LOG.warn("unknown request type in the queue: " + reqsByType); LOG.warn("unknown request type in the queue: " + reqsByType);
@ -245,34 +260,32 @@ public class RSProcedureDispatcher
// ========================================================================== // ==========================================================================
protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
implements RemoteProcedureResolver { implements RemoteProcedureResolver {
private final Set<RemoteProcedure> operations; private final Set<RemoteProcedure> remoteProcedures;
private ExecuteProceduresRequest.Builder request = null; private ExecuteProceduresRequest.Builder request = null;
public ExecuteProceduresRemoteCall(final ServerName serverName, public ExecuteProceduresRemoteCall(final ServerName serverName,
final Set<RemoteProcedure> operations) { final Set<RemoteProcedure> remoteProcedures) {
super(serverName); super(serverName);
this.operations = operations; this.remoteProcedures = remoteProcedures;
} }
public Void call() { public Void call() {
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
request = ExecuteProceduresRequest.newBuilder(); request = ExecuteProceduresRequest.newBuilder();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Building request with operations count=" + operations.size()); LOG.trace("Building request with operations count=" + remoteProcedures.size());
} }
splitAndResolveOperation(getServerName(), operations, this); splitAndResolveOperation(getServerName(), remoteProcedures, this);
try { try {
final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build()); final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
remoteCallCompleted(env, response); remoteCallCompleted(procedureEnv, response);
} catch (IOException e) { } catch (IOException e) {
e = unwrapException(e); e = unwrapException(e);
// TODO: In the future some operation may want to bail out early. // TODO: In the future some operation may want to bail out early.
// TODO: How many times should we retry (use numberOfAttemptsSoFar) // TODO: How many times should we retry (use numberOfAttemptsSoFar)
if (!scheduleForRetry(e)) { if (!scheduleForRetry(e)) {
remoteCallFailed(env, e); remoteCallFailed(procedureEnv, e);
} }
} }
return null; return null;
@ -309,17 +322,12 @@ public class RSProcedureDispatcher
} }
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
for (RemoteProcedure proc: operations) { for (RemoteProcedure proc: remoteProcedures) {
proc.remoteCallFailed(env, getServerName(), e); proc.remoteCallFailed(env, getServerName(), e);
} }
} }
} }
// ==========================================================================
// Compatibility calls
// Since we don't have a "batch proc-exec" request on the target RS
// we have to chunk the requests by type and dispatch the specific request.
// ==========================================================================
private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env, private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
final ServerName serverName, final List<RegionOpenOperation> operations) { final ServerName serverName, final List<RegionOpenOperation> operations) {
final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder(); final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
@ -331,6 +339,15 @@ public class RSProcedureDispatcher
return builder.build(); return builder.build();
} }
// ==========================================================================
// Compatibility calls
// Since we don't have a "batch proc-exec" request on the target RS
// we have to chunk the requests by type and dispatch the specific request.
// ==========================================================================
/**
* Compatibility class used by {@link CompatRemoteProcedureResolver} to open regions using old
* {@link AdminService#openRegion(RpcController, OpenRegionRequest, RpcCallback)} rpc.
*/
private final class OpenRegionRemoteCall extends AbstractRSRemoteCall { private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
private final List<RegionOpenOperation> operations; private final List<RegionOpenOperation> operations;
@ -342,18 +359,18 @@ public class RSProcedureDispatcher
@Override @Override
public Void call() { public Void call() {
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); final OpenRegionRequest request =
final OpenRegionRequest request = buildOpenRegionRequest(env, getServerName(), operations); buildOpenRegionRequest(procedureEnv, getServerName(), operations);
try { try {
OpenRegionResponse response = sendRequest(getServerName(), request); OpenRegionResponse response = sendRequest(getServerName(), request);
remoteCallCompleted(env, response); remoteCallCompleted(procedureEnv, response);
} catch (IOException e) { } catch (IOException e) {
e = unwrapException(e); e = unwrapException(e);
// TODO: In the future some operation may want to bail out early. // TODO: In the future some operation may want to bail out early.
// TODO: How many times should we retry (use numberOfAttemptsSoFar) // TODO: How many times should we retry (use numberOfAttemptsSoFar)
if (!scheduleForRetry(e)) { if (!scheduleForRetry(e)) {
remoteCallFailed(env, e); remoteCallFailed(procedureEnv, e);
} }
} }
return null; return null;
@ -385,6 +402,10 @@ public class RSProcedureDispatcher
} }
} }
/**
* Compatibility class used by {@link CompatRemoteProcedureResolver} to close regions using old
* {@link AdminService#closeRegion(RpcController, CloseRegionRequest, RpcCallback)} rpc.
*/
private final class CloseRegionRemoteCall extends AbstractRSRemoteCall { private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
private final RegionCloseOperation operation; private final RegionCloseOperation operation;
@ -396,17 +417,16 @@ public class RSProcedureDispatcher
@Override @Override
public Void call() { public Void call() {
final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName()); final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
try { try {
CloseRegionResponse response = sendRequest(getServerName(), request); CloseRegionResponse response = sendRequest(getServerName(), request);
remoteCallCompleted(env, response); remoteCallCompleted(procedureEnv, response);
} catch (IOException e) { } catch (IOException e) {
e = unwrapException(e); e = unwrapException(e);
// TODO: In the future some operation may want to bail out early. // TODO: In the future some operation may want to bail out early.
// TODO: How many times should we retry (use numberOfAttemptsSoFar) // TODO: How many times should we retry (use numberOfAttemptsSoFar)
if (!scheduleForRetry(e)) { if (!scheduleForRetry(e)) {
remoteCallFailed(env, e); remoteCallFailed(procedureEnv, e);
} }
} }
return null; return null;
@ -432,6 +452,10 @@ public class RSProcedureDispatcher
} }
} }
/**
* Compatibility class to open and close regions using old endpoints (openRegion/closeRegion) in
* {@link AdminService}.
*/
protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver { protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver {
private final Set<RemoteProcedure> operations; private final Set<RemoteProcedure> operations;
private final ServerName serverName; private final ServerName serverName;
@ -463,14 +487,16 @@ public class RSProcedureDispatcher
// ========================================================================== // ==========================================================================
// RPC Messages // RPC Messages
// - ServerOperation: refreshConfig, grant, revoke, ... // - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
// - RegionOperation: open, close, flush, snapshot, ... // - RegionOperation: open, close, flush, snapshot, ...
// ========================================================================== // ==========================================================================
/* Currently unused
public static abstract class ServerOperation extends RemoteOperation { public static abstract class ServerOperation extends RemoteOperation {
protected ServerOperation(final RemoteProcedure remoteProcedure) { protected ServerOperation(final RemoteProcedure remoteProcedure) {
super(remoteProcedure); super(remoteProcedure);
} }
} }
*/
public static abstract class RegionOperation extends RemoteOperation { public static abstract class RegionOperation extends RemoteOperation {
private final RegionInfo regionInfo; private final RegionInfo regionInfo;

View File

@ -123,8 +123,7 @@ public class TestMasterBalanceThrottling {
public void run() { public void run() {
while (!stop.get()) { while (!stop.get()) {
maxCount.set(Math.max(maxCount.get(), maxCount.set(Math.max(maxCount.get(),
master.getAssignmentManager().getRegionStates() master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount()));
.getRegionsInTransition().size()));
try { try {
Thread.sleep(10); Thread.sleep(10);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -139,7 +138,7 @@ public class TestMasterBalanceThrottling {
} }
private void unbalance(HMaster master, TableName tableName) throws Exception { private void unbalance(HMaster master, TableName tableName) throws Exception {
while (master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() > 0) { while (master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount() > 0) {
Thread.sleep(100); Thread.sleep(100);
} }
HRegionServer biasedServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); HRegionServer biasedServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
@ -147,7 +146,7 @@ public class TestMasterBalanceThrottling {
master.move(regionInfo.getEncodedNameAsBytes(), master.move(regionInfo.getEncodedNameAsBytes(),
Bytes.toBytes(biasedServer.getServerName().getServerName())); Bytes.toBytes(biasedServer.getServerName().getServerName()));
} }
while (master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() > 0) { while (master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount() > 0) {
Thread.sleep(100); Thread.sleep(100);
} }
} }

View File

@ -43,7 +43,7 @@ public class TestRegionState {
} }
private void testSerializeDeserialize(final TableName tableName, final RegionState.State state) { private void testSerializeDeserialize(final TableName tableName, final RegionState.State state) {
RegionState state1 = new RegionState(new HRegionInfo(tableName), state); RegionState state1 = RegionState.createForTesting(new HRegionInfo(tableName), state);
ClusterStatusProtos.RegionState protobuf1 = state1.convert(); ClusterStatusProtos.RegionState protobuf1 = state1.convert();
RegionState state2 = RegionState.convert(protobuf1); RegionState state2 = RegionState.convert(protobuf1);
ClusterStatusProtos.RegionState protobuf2 = state1.convert(); ClusterStatusProtos.RegionState protobuf2 = state1.convert();

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@ -45,7 +44,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterWalManager; import org.apache.hadoop.hbase.master.MasterWalManager;
import org.apache.hadoop.hbase.master.MockNoopMasterServices; import org.apache.hadoop.hbase.master.MockNoopMasterServices;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
@ -290,8 +288,7 @@ public class MockMasterServices extends MockNoopMasterServices {
} }
@Override @Override
public void updateRegionLocation(RegionInfo regionInfo, State state, ServerName regionLocation, public void updateRegionLocation(RegionStates.RegionStateNode regionNode) throws IOException {
ServerName lastHost, long openSeqNum, long pid) throws IOException {
} }
} }

View File

@ -349,7 +349,7 @@ public class TestAssignmentManager {
fail("unexpected assign completion"); fail("unexpected assign completion");
} catch (RetriesExhaustedException e) { } catch (RetriesExhaustedException e) {
// expected exception // expected exception
LOG.info("REGION STATE " + am.getRegionStates().getRegionNode(hri)); LOG.info("REGION STATE " + am.getRegionStates().getRegionStateNode(hri));
LOG.info("expected exception from assign operation: " + e.getMessage(), e); LOG.info("expected exception from assign operation: " + e.getMessage(), e);
assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen()); assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen());
} }
@ -780,8 +780,8 @@ public class TestAssignmentManager {
} }
@Override @Override
protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> operations) { protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
submitTask(new MockRemoteCall(serverName, operations)); submitTask(new MockRemoteCall(serverName, remoteProcedures));
} }
private class MockRemoteCall extends ExecuteProceduresRemoteCall { private class MockRemoteCall extends ExecuteProceduresRemoteCall {

View File

@ -144,7 +144,7 @@ public class TestRegionStates {
executorService.submit(new Callable<Object>() { executorService.submit(new Callable<Object>() {
@Override @Override
public Object call() { public Object call() {
return stateMap.getOrCreateRegionNode(RegionInfoBuilder.newBuilder(tableName) return stateMap.getOrCreateRegionStateNode(RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes(regionId)) .setStartKey(Bytes.toBytes(regionId))
.setEndKey(Bytes.toBytes(regionId + 1)) .setEndKey(Bytes.toBytes(regionId + 1))
.setSplit(false) .setSplit(false)
@ -156,7 +156,7 @@ public class TestRegionStates {
private Object createRegionNode(final RegionStates stateMap, private Object createRegionNode(final RegionStates stateMap,
final TableName tableName, final long regionId) { final TableName tableName, final long regionId) {
return stateMap.getOrCreateRegionNode(createRegionInfo(tableName, regionId)); return stateMap.getOrCreateRegionStateNode(createRegionInfo(tableName, regionId));
} }
private RegionInfo createRegionInfo(final TableName tableName, final long regionId) { private RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
@ -181,7 +181,7 @@ public class TestRegionStates {
@Override @Override
public Object call() { public Object call() {
RegionInfo hri = createRegionInfo(TABLE_NAME, regionId); RegionInfo hri = createRegionInfo(TABLE_NAME, regionId);
return stateMap.getOrCreateRegionNode(hri); return stateMap.getOrCreateRegionStateNode(hri);
} }
}); });
} }
@ -218,7 +218,7 @@ public class TestRegionStates {
final RegionStates stateMap = new RegionStates(); final RegionStates stateMap = new RegionStates();
long st = System.currentTimeMillis(); long st = System.currentTimeMillis();
for (int i = 0; i < NRUNS; ++i) { for (int i = 0; i < NRUNS; ++i) {
stateMap.createRegionNode(createRegionInfo(TABLE_NAME, i)); stateMap.createRegionStateNode(createRegionInfo(TABLE_NAME, i));
} }
long et = System.currentTimeMillis(); long et = System.currentTimeMillis();
LOG.info(String.format("PERF SingleThread: %s %s/sec", LOG.info(String.format("PERF SingleThread: %s %s/sec",

View File

@ -306,7 +306,7 @@ public class TestHRegionInfo {
Assert.assertArrayEquals(HRegionInfo.HIDDEN_START_KEY, Assert.assertArrayEquals(HRegionInfo.HIDDEN_START_KEY,
HRegionInfo.getStartKeyForDisplay(h, conf)); HRegionInfo.getStartKeyForDisplay(h, conf));
RegionState state = new RegionState(h, RegionState.State.OPEN); RegionState state = RegionState.createForTesting(h, RegionState.State.OPEN);
String descriptiveNameForDisplay = String descriptiveNameForDisplay =
HRegionInfo.getDescriptiveNameFromRegionStateForDisplay(state, conf); HRegionInfo.getDescriptiveNameFromRegionStateForDisplay(state, conf);
checkDescriptiveNameEquality(descriptiveNameForDisplay,state.toDescriptiveString(), startKey); checkDescriptiveNameEquality(descriptiveNameForDisplay,state.toDescriptiveString(), startKey);

View File

@ -515,8 +515,9 @@ public class MetaTableLocator {
state = RegionState.State.OFFLINE; state = RegionState.State.OFFLINE;
} }
return new RegionState( return new RegionState(
RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), RegionReplicaUtil.getRegionInfoForReplica(
state, serverName); RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId),
state, serverName);
} }
/** /**