HBASE-6012 Handling RegionOpeningState for bulk assign (Chunhui)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1349377 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6610703576
commit
b8d3979505
|
@ -1475,7 +1475,20 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
getLong("hbase.regionserver.rpc.startup.waittime", 60000);
|
||||
while (!this.master.isStopped()) {
|
||||
try {
|
||||
this.serverManager.sendRegionOpen(destination, regions);
|
||||
List<RegionOpeningState> regionOpeningStateList = this.serverManager
|
||||
.sendRegionOpen(destination, regions);
|
||||
if (regionOpeningStateList == null) {
|
||||
// Failed getting RPC connection to this server
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < regionOpeningStateList.size(); i++) {
|
||||
if (regionOpeningStateList.get(i) == RegionOpeningState.ALREADY_OPENED) {
|
||||
processAlreadyOpenedRegion(regions.get(i), destination);
|
||||
} else if (regionOpeningStateList.get(i) == RegionOpeningState.FAILED_OPENING) {
|
||||
// Failed opening this region, reassign it
|
||||
assign(regions.get(i), true, true);
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (RemoteException e) {
|
||||
IOException decodedException = e.unwrapRemoteException();
|
||||
|
@ -1534,6 +1547,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
failedPlans.put(e.getKey(), e.getValue());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed bulking assigning " + e.getValue().size()
|
||||
+ " region(s) to " + e.getKey().getServerName()
|
||||
+ ", and continue to bulk assign others", t);
|
||||
failedPlans.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
|
@ -1545,7 +1561,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
+ " regions to server " + e.getKey() + ", reassigning them");
|
||||
reassigningRegions.addAll(e.getValue());
|
||||
}
|
||||
assign(reassigningRegions, servers);
|
||||
for (HRegionInfo region : reassigningRegions) {
|
||||
assign(region, true, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1723,31 +1741,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
|
||||
.getDestination(), state.getRegion(), versionOfOfflineNode);
|
||||
if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
|
||||
// Remove region from in-memory transition and unassigned node from ZK
|
||||
// While trying to enable the table the regions of the table were
|
||||
// already enabled.
|
||||
LOG.debug("ALREADY_OPENED region " + state.getRegion().getRegionNameAsString() +
|
||||
" to " + plan.getDestination().toString());
|
||||
String encodedRegionName = state.getRegion()
|
||||
.getEncodedName();
|
||||
try {
|
||||
ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
if(LOG.isDebugEnabled()){
|
||||
LOG.debug("The unassigned node "+encodedRegionName+" doesnot exist.");
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
master.abort(
|
||||
"Error deleting OFFLINED node in ZK for transition ZK node ("
|
||||
+ encodedRegionName + ")", e);
|
||||
}
|
||||
// no lock concurrent ok -> sequentially consistent
|
||||
this.regionsInTransition.remove(plan.getRegionInfo().getEncodedName());
|
||||
|
||||
synchronized (this.regions) {
|
||||
this.regions.put(plan.getRegionInfo(), plan.getDestination());
|
||||
addToServers(plan.getDestination(), plan.getRegionInfo());
|
||||
}
|
||||
processAlreadyOpenedRegion(state.getRegion(), plan.getDestination());
|
||||
} else if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
|
||||
// Failed opening this region
|
||||
throw new Exception("Get regionOpeningState=" + regionOpenState);
|
||||
}
|
||||
break;
|
||||
} catch (Throwable t) {
|
||||
|
@ -1779,6 +1776,36 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
|
||||
private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
|
||||
|
||||
// Remove region from in-memory transition and unassigned node from ZK
|
||||
// While trying to enable the table the regions of the table were
|
||||
// already enabled.
|
||||
LOG.debug("ALREADY_OPENED region " + region.getRegionNameAsString()
|
||||
+ " to " + sn);
|
||||
String encodedRegionName = region.getEncodedName();
|
||||
try {
|
||||
ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The unassigned node " + encodedRegionName
|
||||
+ " doesnot exist.");
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
master.abort(
|
||||
"Error deleting OFFLINED node in ZK for transition ZK node ("
|
||||
+ encodedRegionName + ")", e);
|
||||
}
|
||||
// no lock concurrent ok -> sequentially consistent
|
||||
this.regionsInTransition.remove(region.getEncodedName());
|
||||
|
||||
synchronized (this.regions) {
|
||||
this.regions.put(region, sn);
|
||||
addToServers(sn, region);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
|
||||
String tableName = region.getTableNameAsString();
|
||||
boolean disabled = this.zkTable.isDisabledTable(tableName);
|
||||
|
|
|
@ -511,16 +511,20 @@ public class ServerManager {
|
|||
* <p>
|
||||
* @param server server to open a region
|
||||
* @param regions regions to open
|
||||
* @return a list of region opening states
|
||||
*/
|
||||
public void sendRegionOpen(ServerName server, List<HRegionInfo> regions)
|
||||
public List<RegionOpeningState> sendRegionOpen(ServerName server,
|
||||
List<HRegionInfo> regions)
|
||||
throws IOException {
|
||||
AdminProtocol admin = getServerConnection(server);
|
||||
if (admin == null) {
|
||||
LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
|
||||
" failed because no RPC connection found to this server");
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
ProtobufUtil.openRegion(admin, regions);
|
||||
|
||||
OpenRegionResponse response = ProtobufUtil.openRegion(admin, regions);
|
||||
return ResponseConverter.getRegionOpeningStateList(response);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.master.DeadServer;
|
|||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
|
@ -305,6 +306,16 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
+ " because it has been opened in "
|
||||
+ addressFromAM.getServerName());
|
||||
} else {
|
||||
if (rit != null) {
|
||||
//clean zk node
|
||||
try{
|
||||
LOG.info("Reassigning region with rs =" + rit + " and deleting zk node if exists");
|
||||
ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), e.getKey());
|
||||
}catch (KeeperException ke) {
|
||||
this.server.abort("Unexpected ZK exception deleting unassigned node " + e.getKey(), ke);
|
||||
return;
|
||||
}
|
||||
}
|
||||
toAssignRegions.add(e.getKey());
|
||||
}
|
||||
} else if (rit != null && (rit.isSplitting() || rit.isSplit())) {
|
||||
|
|
|
@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRespo
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
|
||||
|
@ -1286,7 +1287,6 @@ public final class ProtobufUtil {
|
|||
|
||||
/**
|
||||
* A helper to open a region using admin protocol.
|
||||
*
|
||||
* @param admin
|
||||
* @param region
|
||||
* @throws IOException
|
||||
|
@ -1307,14 +1307,15 @@ public final class ProtobufUtil {
|
|||
*
|
||||
* @param admin
|
||||
* @param regions
|
||||
* @return OpenRegionResponse
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void openRegion(final AdminProtocol admin,
|
||||
public static OpenRegionResponse openRegion(final AdminProtocol admin,
|
||||
final List<HRegionInfo> regions) throws IOException {
|
||||
OpenRegionRequest request =
|
||||
RequestConverter.buildOpenRegionRequest(regions);
|
||||
try {
|
||||
admin.openRegion(null, request);
|
||||
return admin.openRegion(null, request);
|
||||
} catch (ServiceException se) {
|
||||
throw getRemoteException(se);
|
||||
}
|
||||
|
|
|
@ -156,6 +156,23 @@ public final class ResponseConverter {
|
|||
proto.getOpeningState(0).name());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of region opening state from a OpenRegionResponse
|
||||
*
|
||||
* @param proto the OpenRegionResponse
|
||||
* @return the list of region opening state
|
||||
*/
|
||||
public static List<RegionOpeningState> getRegionOpeningStateList(
|
||||
final OpenRegionResponse proto) {
|
||||
if (proto == null) return null;
|
||||
List<RegionOpeningState> regionOpeningStates = new ArrayList<RegionOpeningState>();
|
||||
for (int i = 0; i < proto.getOpeningStateCount(); i++) {
|
||||
regionOpeningStates.add(RegionOpeningState.valueOf(
|
||||
proto.getOpeningState(i).name()));
|
||||
}
|
||||
return regionOpeningStates;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the region is closed from a CloseRegionResponse
|
||||
*
|
||||
|
|
|
@ -3363,18 +3363,22 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
try {
|
||||
checkOpen();
|
||||
} catch (IOException ie) {
|
||||
throw new ServiceException(ie);
|
||||
}
|
||||
requestCount.incrementAndGet();
|
||||
OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
|
||||
Map<String, HTableDescriptor> htds =
|
||||
new HashMap<String, HTableDescriptor>(request.getRegionList().size());
|
||||
Map<String, HTableDescriptor> htds = new HashMap<String, HTableDescriptor>(
|
||||
request.getRegionList().size());
|
||||
boolean isBulkAssign = request.getRegionList().size() > 1;
|
||||
for (RegionInfo regionInfo : request.getRegionList()) {
|
||||
HRegionInfo region = HRegionInfo.convert(regionInfo);
|
||||
try {
|
||||
checkIfRegionInTransition(region, OPEN);
|
||||
|
||||
HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
|
||||
if (null != onlineRegion) {
|
||||
// See HBASE-5094. Cross check with META if still this RS is owning the
|
||||
// region.
|
||||
// See HBASE-5094. Cross check with META if still this RS is owning
|
||||
// the region.
|
||||
Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
|
||||
this.catalogTracker, region.getRegionName());
|
||||
if (this.getServerName().equals(p.getSecond())) {
|
||||
|
@ -3388,14 +3392,15 @@ public class HRegionServer implements ClientProtocol,
|
|||
removeFromOnlineRegions(region.getEncodedName(), null);
|
||||
}
|
||||
}
|
||||
LOG.info("Received request to open region: "
|
||||
+ region.getRegionNameAsString() + " on "+this.serverNameFromMasterPOV);
|
||||
LOG.info("Received request to open region: " + region.getRegionNameAsString() + " on "
|
||||
+ this.serverNameFromMasterPOV);
|
||||
HTableDescriptor htd = htds.get(region.getTableNameAsString());
|
||||
if (htd == null) {
|
||||
htd = this.tableDescriptors.get(region.getTableName());
|
||||
htds.put(region.getTableNameAsString(), htd);
|
||||
}
|
||||
this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), true);
|
||||
this.regionsInTransitionInRS.putIfAbsent(
|
||||
region.getEncodedNameAsBytes(), true);
|
||||
// Need to pass the expected version in the constructor.
|
||||
if (region.isRootRegion()) {
|
||||
this.service.submit(new OpenRootHandler(this, this, region, htd,
|
||||
|
@ -3408,12 +3413,25 @@ public class HRegionServer implements ClientProtocol,
|
|||
versionOfOfflineNode));
|
||||
}
|
||||
builder.addOpeningState(RegionOpeningState.OPENED);
|
||||
} catch (RegionAlreadyInTransitionException rie) {
|
||||
LOG.warn("Region is already in transition", rie);
|
||||
if (isBulkAssign) {
|
||||
builder.addOpeningState(RegionOpeningState.OPENED);
|
||||
} else {
|
||||
throw new ServiceException(rie);
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
|
||||
if (isBulkAssign) {
|
||||
builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
|
||||
} else {
|
||||
throw new ServiceException(ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a region on the region server.
|
||||
|
|
|
@ -580,6 +580,7 @@ public class TestAssignmentManager {
|
|||
MasterServices services = Mockito.mock(MasterServices.class);
|
||||
Mockito.when(services.getAssignmentManager()).thenReturn(am);
|
||||
Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
|
||||
Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
|
||||
ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
|
||||
services, deadServers, SERVERNAME_A, false);
|
||||
handler.process();
|
||||
|
|
Loading…
Reference in New Issue