HBASE-10915 Decouple region closing (HM and HRS) from ZK (Mikhail Antonov)
This commit is contained in:
parent
d20feaf1e7
commit
ec9c12edff
|
@ -52,8 +52,14 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
|
|||
@Override
|
||||
public abstract TableStateManager getTableStateManager() throws InterruptedException,
|
||||
CoordinatedStateException;
|
||||
|
||||
/**
|
||||
* Method to retrieve coordination for split transaction
|
||||
* Method to retrieve coordination for split transaction.
|
||||
*/
|
||||
abstract public SplitTransactionCoordination getSplitTransactionCoordination();
|
||||
|
||||
/**
|
||||
* Method to retrieve coordination for closing region operations.
|
||||
*/
|
||||
public abstract CloseRegionCoordination getCloseRegionCoordination();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coordination;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
||||
/**
|
||||
* Coordinated operations for close region handlers.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface CloseRegionCoordination {
|
||||
|
||||
/**
|
||||
* Called before actual region closing to check that we can do close operation
|
||||
* on this region.
|
||||
* @param regionInfo region being closed
|
||||
* @param crd details about closing operation
|
||||
* @return true if caller shall proceed and close, false if need to abort closing.
|
||||
*/
|
||||
boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd);
|
||||
|
||||
/**
|
||||
* Called after region is closed to notify all interesting parties / "register"
|
||||
* region as finally closed.
|
||||
* @param region region being closed
|
||||
* @param sn ServerName on which task runs
|
||||
* @param crd details about closing operation
|
||||
*/
|
||||
void setClosedState(HRegion region, ServerName sn, CloseRegionDetails crd);
|
||||
|
||||
/**
|
||||
* Construct CloseRegionDetails instance from CloseRegionRequest.
|
||||
* @return instance of CloseRegionDetails
|
||||
*/
|
||||
CloseRegionDetails parseFromProtoRequest(AdminProtos.CloseRegionRequest request);
|
||||
|
||||
/**
|
||||
* Get details object with params for case when we're closing on
|
||||
* regionserver side internally (not because of RPC call from master),
|
||||
* so we don't parse details from protobuf request.
|
||||
*/
|
||||
CloseRegionDetails getDetaultDetails();
|
||||
|
||||
/**
|
||||
* Marker interface for region closing tasks. Used to carry implementation details in
|
||||
* encapsulated way through Handlers to the consensus API.
|
||||
*/
|
||||
static interface CloseRegionDetails {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,197 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coordination;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* ZK-based implementation of {@link CloseRegionCoordination}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZkCloseRegionCoordination implements CloseRegionCoordination {
|
||||
private static final Log LOG = LogFactory.getLog(ZkCloseRegionCoordination.class);
|
||||
|
||||
private final static int FAILED_VERSION = -1;
|
||||
|
||||
private CoordinatedStateManager csm;
|
||||
private final ZooKeeperWatcher watcher;
|
||||
|
||||
public ZkCloseRegionCoordination(CoordinatedStateManager csm, ZooKeeperWatcher watcher) {
|
||||
this.csm = csm;
|
||||
this.watcher = watcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* In ZK-based version we're checking for bad znode state, e.g. if we're
|
||||
* trying to delete the znode, and it's not ours (version doesn't match).
|
||||
*/
|
||||
@Override
|
||||
public boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd) {
|
||||
ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd;
|
||||
|
||||
try {
|
||||
return zkCrd.isPublishStatusInZk() && !ZKAssign.checkClosingState(watcher,
|
||||
regionInfo, ((ZkCloseRegionDetails) crd).getExpectedVersion());
|
||||
} catch (KeeperException ke) {
|
||||
csm.getServer().abort("Unrecoverable exception while checking state with zk " +
|
||||
regionInfo.getRegionNameAsString() + ", still finishing close", ke);
|
||||
throw new RuntimeException(ke);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In ZK-based version we do some znodes transitioning.
|
||||
*/
|
||||
@Override
|
||||
public void setClosedState(HRegion region, ServerName sn, CloseRegionDetails crd) {
|
||||
ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd;
|
||||
String name = region.getRegionInfo().getRegionNameAsString();
|
||||
|
||||
if (zkCrd.isPublishStatusInZk()) {
|
||||
if (setClosedState(region,sn, zkCrd)) {
|
||||
LOG.debug("Set closed state in zk for " + name + " on " + sn);
|
||||
} else {
|
||||
LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " + sn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse ZK-related fields from request.
|
||||
*/
|
||||
@Override
|
||||
public CloseRegionDetails parseFromProtoRequest(AdminProtos.CloseRegionRequest request) {
|
||||
ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
|
||||
new ZkCloseRegionCoordination.ZkCloseRegionDetails();
|
||||
zkCrd.setPublishStatusInZk(request.getTransitionInZK());
|
||||
int versionOfClosingNode = -1;
|
||||
if (request.hasVersionOfClosingNode()) {
|
||||
versionOfClosingNode = request.getVersionOfClosingNode();
|
||||
}
|
||||
zkCrd.setExpectedVersion(versionOfClosingNode);
|
||||
|
||||
return zkCrd;
|
||||
}
|
||||
|
||||
/**
|
||||
* No ZK tracking will be performed for that case.
|
||||
* This method should be used when we want to construct CloseRegionDetails,
|
||||
* but don't want any coordination on that (when it's initiated by regionserver),
|
||||
* so no znode state transitions will be performed.
|
||||
*/
|
||||
@Override
|
||||
public CloseRegionDetails getDetaultDetails() {
|
||||
ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
|
||||
new ZkCloseRegionCoordination.ZkCloseRegionDetails();
|
||||
zkCrd.setPublishStatusInZk(false);
|
||||
zkCrd.setExpectedVersion(FAILED_VERSION);
|
||||
|
||||
return zkCrd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition ZK node to CLOSED
|
||||
* @param region HRegion instance being closed
|
||||
* @param sn ServerName on which task runs
|
||||
* @param zkCrd details about region closing operation.
|
||||
* @return If the state is set successfully
|
||||
*/
|
||||
private boolean setClosedState(final HRegion region,
|
||||
ServerName sn,
|
||||
ZkCloseRegionDetails zkCrd) {
|
||||
final int expectedVersion = zkCrd.getExpectedVersion();
|
||||
|
||||
try {
|
||||
if (ZKAssign.transitionNodeClosed(watcher, region.getRegionInfo(),
|
||||
sn, expectedVersion) == FAILED_VERSION) {
|
||||
LOG.warn("Completed the CLOSE of a region but when transitioning from " +
|
||||
" CLOSING to CLOSED got a version mismatch, someone else clashed " +
|
||||
"so now unassigning");
|
||||
region.close();
|
||||
return false;
|
||||
}
|
||||
} catch (NullPointerException e) {
|
||||
// I've seen NPE when table was deleted while close was running in unit tests.
|
||||
LOG.warn("NPE during close -- catching and continuing...", e);
|
||||
return false;
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Failed transitioning node from CLOSING to CLOSED", e);
|
||||
return false;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to close region after failing to transition", e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* ZK-based implementation. Has details about whether the state transition should be
|
||||
* reflected in ZK, as well as expected version of znode.
|
||||
*/
|
||||
public static class ZkCloseRegionDetails implements CloseRegionCoordination.CloseRegionDetails {
|
||||
|
||||
/**
|
||||
* True if we are to update zk about the region close; if the close
|
||||
* was orchestrated by master, then update zk. If the close is being run by
|
||||
* the regionserver because its going down, don't update zk.
|
||||
* */
|
||||
private boolean publishStatusInZk;
|
||||
|
||||
/**
|
||||
* The version of znode to compare when RS transitions the znode from
|
||||
* CLOSING state.
|
||||
*/
|
||||
private int expectedVersion = FAILED_VERSION;
|
||||
|
||||
public ZkCloseRegionDetails() {
|
||||
}
|
||||
|
||||
public ZkCloseRegionDetails(boolean publishStatusInZk, int expectedVersion) {
|
||||
this.publishStatusInZk = publishStatusInZk;
|
||||
this.expectedVersion = expectedVersion;
|
||||
}
|
||||
|
||||
public boolean isPublishStatusInZk() {
|
||||
return publishStatusInZk;
|
||||
}
|
||||
|
||||
public void setPublishStatusInZk(boolean publishStatusInZk) {
|
||||
this.publishStatusInZk = publishStatusInZk;
|
||||
}
|
||||
|
||||
public int getExpectedVersion() {
|
||||
return expectedVersion;
|
||||
}
|
||||
|
||||
public void setExpectedVersion(int expectedVersion) {
|
||||
this.expectedVersion = expectedVersion;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -36,6 +36,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
|
|||
protected Server server;
|
||||
protected ZooKeeperWatcher watcher;
|
||||
protected SplitTransactionCoordination splitTransactionCoordination;
|
||||
protected CloseRegionCoordination closeRegionCoordination;
|
||||
|
||||
@Override
|
||||
public void initialize(Server server) {
|
||||
|
@ -43,6 +44,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
|
|||
this.watcher = server.getZooKeeper();
|
||||
|
||||
splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher);
|
||||
closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -64,4 +66,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
|
|||
public SplitTransactionCoordination getSplitTransactionCoordination() {
|
||||
return splitTransactionCoordination;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseRegionCoordination getCloseRegionCoordination() {
|
||||
return closeRegionCoordination;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,6 +77,8 @@ import org.apache.hadoop.hbase.catalog.MetaEditor;
|
|||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
||||
|
@ -392,7 +394,7 @@ public class HRegionServer extends HasThread implements
|
|||
|
||||
protected final RSRpcServices rpcServices;
|
||||
|
||||
protected CoordinatedStateManager csm;
|
||||
protected BaseCoordinatedStateManager csm;
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location.
|
||||
|
@ -483,7 +485,7 @@ public class HRegionServer extends HasThread implements
|
|||
zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
|
||||
rpcServices.isa.getPort(), this, canCreateBaseZNode());
|
||||
|
||||
this.csm = csm;
|
||||
this.csm = (BaseCoordinatedStateManager) csm;
|
||||
this.csm.initialize(this);
|
||||
this.csm.start();
|
||||
|
||||
|
@ -2157,7 +2159,7 @@ public class HRegionServer extends HasThread implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public CoordinatedStateManager getCoordinatedStateManager() {
|
||||
public BaseCoordinatedStateManager getCoordinatedStateManager() {
|
||||
return csm;
|
||||
}
|
||||
|
||||
|
@ -2320,7 +2322,9 @@ public class HRegionServer extends HasThread implements
|
|||
*/
|
||||
private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
|
||||
try {
|
||||
if (!closeRegion(region.getEncodedName(), abort, false, -1, null)) {
|
||||
CloseRegionCoordination.CloseRegionDetails details =
|
||||
csm.getCloseRegionCoordination().getDetaultDetails();
|
||||
if (!closeRegion(region.getEncodedName(), abort, details, null)) {
|
||||
LOG.warn("Failed to close " + region.getRegionNameAsString() +
|
||||
" - ignoring and continuing");
|
||||
}
|
||||
|
@ -2345,17 +2349,13 @@ public class HRegionServer extends HasThread implements
|
|||
*
|
||||
* @param encodedName Region to close
|
||||
* @param abort True if we are aborting
|
||||
* @param zk True if we are to update zk about the region close; if the close
|
||||
* was orchestrated by master, then update zk. If the close is being run by
|
||||
* the regionserver because its going down, don't update zk.
|
||||
* @param versionOfClosingNode the version of znode to compare when RS transitions the znode from
|
||||
* CLOSING state.
|
||||
* @param crd details about closing region coordination-coordinated task
|
||||
* @return True if closed a region.
|
||||
* @throws NotServingRegionException if the region is not online
|
||||
* @throws RegionAlreadyInTransitionException if the region is already closing
|
||||
*/
|
||||
protected boolean closeRegion(String encodedName, final boolean abort,
|
||||
final boolean zk, final int versionOfClosingNode, final ServerName sn)
|
||||
CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
|
||||
throws NotServingRegionException, RegionAlreadyInTransitionException {
|
||||
//Check for permissions to close.
|
||||
HRegion actualRegion = this.getFromOnlineRegions(encodedName);
|
||||
|
@ -2379,7 +2379,7 @@ public class HRegionServer extends HasThread implements
|
|||
// We're going to try to do a standard close then.
|
||||
LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
|
||||
" Doing a standard close now");
|
||||
return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
|
||||
return closeRegion(encodedName, abort, crd, sn);
|
||||
}
|
||||
// Let's get the region from the online region list again
|
||||
actualRegion = this.getFromOnlineRegions(encodedName);
|
||||
|
@ -2413,9 +2413,11 @@ public class HRegionServer extends HasThread implements
|
|||
CloseRegionHandler crh;
|
||||
final HRegionInfo hri = actualRegion.getRegionInfo();
|
||||
if (hri.isMetaRegion()) {
|
||||
crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode);
|
||||
crh = new CloseMetaHandler(this, this, hri, abort,
|
||||
csm.getCloseRegionCoordination(), crd);
|
||||
} else {
|
||||
crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn);
|
||||
crh = new CloseRegionHandler(this, this, hri, abort,
|
||||
csm.getCloseRegionCoordination(), crd, sn);
|
||||
}
|
||||
this.service.submit(crh);
|
||||
return true;
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
|
||||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
|
||||
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
||||
|
@ -883,11 +884,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
@QosPriority(priority=HConstants.HIGH_QOS)
|
||||
public CloseRegionResponse closeRegion(final RpcController controller,
|
||||
final CloseRegionRequest request) throws ServiceException {
|
||||
int versionOfClosingNode = -1;
|
||||
if (request.hasVersionOfClosingNode()) {
|
||||
versionOfClosingNode = request.getVersionOfClosingNode();
|
||||
}
|
||||
boolean zk = request.getTransitionInZK();
|
||||
final ServerName sn = (request.hasDestinationServer() ?
|
||||
ProtobufUtil.toServerName(request.getDestinationServer()) : null);
|
||||
|
||||
|
@ -911,10 +907,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
|
||||
requestCount.increment();
|
||||
LOG.info("Close " + encodedRegionName + ", via zk=" + (zk ? "yes" : "no")
|
||||
+ ", znode version=" + versionOfClosingNode + ", on " + sn);
|
||||
LOG.info("Close " + encodedRegionName + ", on " + sn);
|
||||
CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
|
||||
.getCloseRegionCoordination().parseFromProtoRequest(request);
|
||||
|
||||
boolean closed = regionServer.closeRegion(encodedRegionName, false, zk, versionOfClosingNode, sn);
|
||||
boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
|
||||
CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
|
||||
return builder.build();
|
||||
} catch (IOException ie) {
|
||||
|
|
|
@ -23,24 +23,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
|
||||
|
||||
/**
|
||||
* Handles closing of the root region on a region server.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CloseMetaHandler extends CloseRegionHandler {
|
||||
// Called when master tells us shutdown a region via close rpc
|
||||
public CloseMetaHandler(final Server server,
|
||||
final RegionServerServices rsServices, final HRegionInfo regionInfo) {
|
||||
this(server, rsServices, regionInfo, false, true, -1);
|
||||
}
|
||||
|
||||
// Called when regionserver determines its to go down; not master orchestrated
|
||||
public CloseMetaHandler(final Server server,
|
||||
final RegionServerServices rsServices,
|
||||
final HRegionInfo regionInfo,
|
||||
final boolean abort, final boolean zk, final int versionOfClosingNode) {
|
||||
super(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
|
||||
EventType.M_RS_CLOSE_META);
|
||||
final boolean abort, CloseRegionCoordination closeRegionCoordination,
|
||||
CloseRegionCoordination.CloseRegionDetails crd) {
|
||||
super(server, rsServices, regionInfo, abort, closeRegionCoordination,
|
||||
crd, EventType.M_RS_CLOSE_META);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,12 +26,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Handles closing of a region on a region server.
|
||||
|
@ -45,29 +44,15 @@ public class CloseRegionHandler extends EventHandler {
|
|||
// have a running queue of user regions to close?
|
||||
private static final Log LOG = LogFactory.getLog(CloseRegionHandler.class);
|
||||
|
||||
private final int FAILED = -1;
|
||||
int expectedVersion = FAILED;
|
||||
|
||||
private final RegionServerServices rsServices;
|
||||
|
||||
private final HRegionInfo regionInfo;
|
||||
|
||||
// If true, the hosting server is aborting. Region close process is different
|
||||
// when we are aborting.
|
||||
private final boolean abort;
|
||||
|
||||
// Update zk on closing transitions. Usually true. Its false if cluster
|
||||
// is going down. In this case, its the rs that initiates the region
|
||||
// close -- not the master process so state up in zk will unlikely be
|
||||
// CLOSING.
|
||||
private final boolean zk;
|
||||
private ServerName destination;
|
||||
|
||||
// This is executed after receiving an CLOSE RPC from the master.
|
||||
public CloseRegionHandler(final Server server,
|
||||
final RegionServerServices rsServices, HRegionInfo regionInfo) {
|
||||
this(server, rsServices, regionInfo, false, true, -1, EventType.M_RS_CLOSE_REGION, null);
|
||||
}
|
||||
private CloseRegionCoordination closeRegionCoordination;
|
||||
private CloseRegionCoordination.CloseRegionDetails closeRegionDetails;
|
||||
|
||||
/**
|
||||
* This method used internally by the RegionServer to close out regions.
|
||||
|
@ -75,43 +60,48 @@ public class CloseRegionHandler extends EventHandler {
|
|||
* @param rsServices
|
||||
* @param regionInfo
|
||||
* @param abort If the regionserver is aborting.
|
||||
* @param zk If the close should be noted out in zookeeper.
|
||||
* @param closeRegionCoordination consensus for closing regions
|
||||
* @param crd object carrying details about region close task.
|
||||
*/
|
||||
public CloseRegionHandler(final Server server,
|
||||
final RegionServerServices rsServices,
|
||||
final HRegionInfo regionInfo, final boolean abort, final boolean zk,
|
||||
final int versionOfClosingNode) {
|
||||
this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
|
||||
final HRegionInfo regionInfo, final boolean abort,
|
||||
CloseRegionCoordination closeRegionCoordination,
|
||||
CloseRegionCoordination.CloseRegionDetails crd) {
|
||||
this(server, rsServices, regionInfo, abort, closeRegionCoordination, crd,
|
||||
EventType.M_RS_CLOSE_REGION, null);
|
||||
}
|
||||
|
||||
public CloseRegionHandler(final Server server,
|
||||
final RegionServerServices rsServices,
|
||||
final HRegionInfo regionInfo, final boolean abort, final boolean zk,
|
||||
final int versionOfClosingNode, ServerName destination) {
|
||||
this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
|
||||
final HRegionInfo regionInfo, final boolean abort,
|
||||
CloseRegionCoordination closeRegionCoordination,
|
||||
CloseRegionCoordination.CloseRegionDetails crd,
|
||||
ServerName destination) {
|
||||
this(server, rsServices, regionInfo, abort, closeRegionCoordination, crd,
|
||||
EventType.M_RS_CLOSE_REGION, destination);
|
||||
}
|
||||
|
||||
public CloseRegionHandler(final Server server,
|
||||
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
||||
boolean abort, final boolean zk, final int versionOfClosingNode,
|
||||
EventType eventType) {
|
||||
this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, eventType, null);
|
||||
boolean abort, CloseRegionCoordination closeRegionCoordination,
|
||||
CloseRegionCoordination.CloseRegionDetails crd, EventType eventType) {
|
||||
this(server, rsServices, regionInfo, abort, closeRegionCoordination, crd, eventType, null);
|
||||
}
|
||||
|
||||
protected CloseRegionHandler(final Server server,
|
||||
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
||||
boolean abort, final boolean zk, final int versionOfClosingNode,
|
||||
boolean abort, CloseRegionCoordination closeRegionCoordination,
|
||||
CloseRegionCoordination.CloseRegionDetails crd,
|
||||
EventType eventType, ServerName destination) {
|
||||
super(server, eventType);
|
||||
this.server = server;
|
||||
this.rsServices = rsServices;
|
||||
this.regionInfo = regionInfo;
|
||||
this.abort = abort;
|
||||
this.zk = zk;
|
||||
this.expectedVersion = versionOfClosingNode;
|
||||
this.destination = destination;
|
||||
this.closeRegionCoordination = closeRegionCoordination;
|
||||
this.closeRegionDetails = crd;
|
||||
}
|
||||
|
||||
public HRegionInfo getRegionInfo() {
|
||||
|
@ -128,18 +118,14 @@ public class CloseRegionHandler extends EventHandler {
|
|||
HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName);
|
||||
if (region == null) {
|
||||
LOG.warn("Received CLOSE for region " + name + " but currently not serving - ignoring");
|
||||
if (zk){
|
||||
LOG.error("The znode is not modified as we are not serving " + name);
|
||||
}
|
||||
// TODO: do better than a simple warning
|
||||
return;
|
||||
}
|
||||
|
||||
// Close the region
|
||||
try {
|
||||
if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)){
|
||||
// bad znode state
|
||||
return; // We're node deleting the znode, but it's not ours...
|
||||
if (closeRegionCoordination.checkClosingState(regionInfo, closeRegionDetails)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: If we need to keep updating CLOSING stamp to prevent against
|
||||
|
@ -152,10 +138,6 @@ public class CloseRegionHandler extends EventHandler {
|
|||
regionInfo.getRegionNameAsString());
|
||||
return;
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
server.abort("Unrecoverable exception while checking state with zk " +
|
||||
regionInfo.getRegionNameAsString() + ", still finishing close", ke);
|
||||
throw new RuntimeException(ke);
|
||||
} catch (IOException ioe) {
|
||||
// An IOException here indicates that we couldn't successfully flush the
|
||||
// memstore before closing. So, we need to abort the server and allow
|
||||
|
@ -166,15 +148,8 @@ public class CloseRegionHandler extends EventHandler {
|
|||
}
|
||||
|
||||
this.rsServices.removeFromOnlineRegions(region, destination);
|
||||
|
||||
if (this.zk) {
|
||||
if (setClosedState(this.expectedVersion, region)) {
|
||||
LOG.debug("Set closed state in zk for " + name + " on " + this.server.getServerName());
|
||||
} else {
|
||||
LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " +
|
||||
this.server.getServerName());
|
||||
}
|
||||
}
|
||||
closeRegionCoordination.setClosedState(region, this.server.getServerName(),
|
||||
closeRegionDetails);
|
||||
|
||||
// Done! Region is closed on this RS
|
||||
LOG.debug("Closed " + region.getRegionNameAsString());
|
||||
|
@ -183,33 +158,4 @@ public class CloseRegionHandler extends EventHandler {
|
|||
remove(this.regionInfo.getEncodedNameAsBytes());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition ZK node to CLOSED
|
||||
* @param expectedVersion
|
||||
* @return If the state is set successfully
|
||||
*/
|
||||
private boolean setClosedState(final int expectedVersion, final HRegion region) {
|
||||
try {
|
||||
if (ZKAssign.transitionNodeClosed(server.getZooKeeper(), regionInfo,
|
||||
server.getServerName(), expectedVersion) == FAILED) {
|
||||
LOG.warn("Completed the CLOSE of a region but when transitioning from " +
|
||||
" CLOSING to CLOSED got a version mismatch, someone else clashed " +
|
||||
"so now unassigning");
|
||||
region.close();
|
||||
return false;
|
||||
}
|
||||
} catch (NullPointerException e) {
|
||||
// I've seen NPE when table was deleted while close was running in unit tests.
|
||||
LOG.warn("NPE during close -- catching and continuing...", e);
|
||||
return false;
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Failed transitioning node from CLOSING to CLOSED", e);
|
||||
return false;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to close region after failing to transition", e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
|
@ -1095,7 +1096,7 @@ public class TestSplitTransactionOnCluster {
|
|||
this.server = server;
|
||||
this.watcher = server.getZooKeeper();
|
||||
splitTransactionCoordination = new MockedSplitTransactionCoordination(this, watcher, region);
|
||||
|
||||
closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,10 +33,12 @@ import org.apache.hadoop.hbase.MediumTests;
|
|||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.MockServer;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
|
@ -110,8 +112,18 @@ public class TestCloseRegionHandler {
|
|||
rss.addToOnlineRegions(spy);
|
||||
// Assert the Server is NOT stopped before we call close region.
|
||||
assertFalse(server.isStopped());
|
||||
CloseRegionHandler handler =
|
||||
new CloseRegionHandler(server, rss, hri, false, false, -1);
|
||||
|
||||
ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
|
||||
consensusProvider.initialize(server);
|
||||
consensusProvider.start();
|
||||
|
||||
ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
|
||||
new ZkCloseRegionCoordination.ZkCloseRegionDetails();
|
||||
zkCrd.setPublishStatusInZk(false);
|
||||
zkCrd.setExpectedVersion(-1);
|
||||
|
||||
CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
|
||||
consensusProvider.getCloseRegionCoordination(), zkCrd);
|
||||
boolean throwable = false;
|
||||
try {
|
||||
handler.process();
|
||||
|
@ -153,9 +165,18 @@ public class TestCloseRegionHandler {
|
|||
// The CloseRegionHandler will validate the expected version
|
||||
// Given it is set to invalid versionOfClosingNode+1,
|
||||
// CloseRegionHandler should be M_ZK_REGION_CLOSING
|
||||
CloseRegionHandler handler =
|
||||
new CloseRegionHandler(server, rss, hri, false, true,
|
||||
versionOfClosingNode+1);
|
||||
|
||||
ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
|
||||
consensusProvider.initialize(server);
|
||||
consensusProvider.start();
|
||||
|
||||
ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
|
||||
new ZkCloseRegionCoordination.ZkCloseRegionDetails();
|
||||
zkCrd.setPublishStatusInZk(true);
|
||||
zkCrd.setExpectedVersion(versionOfClosingNode+1);
|
||||
|
||||
CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
|
||||
consensusProvider.getCloseRegionCoordination(), zkCrd);
|
||||
handler.process();
|
||||
|
||||
// Handler should remain in M_ZK_REGION_CLOSING
|
||||
|
@ -190,9 +211,18 @@ public class TestCloseRegionHandler {
|
|||
// The CloseRegionHandler will validate the expected version
|
||||
// Given it is set to correct versionOfClosingNode,
|
||||
// CloseRegionHandlerit should be RS_ZK_REGION_CLOSED
|
||||
CloseRegionHandler handler =
|
||||
new CloseRegionHandler(server, rss, hri, false, true,
|
||||
versionOfClosingNode);
|
||||
|
||||
ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
|
||||
consensusProvider.initialize(server);
|
||||
consensusProvider.start();
|
||||
|
||||
ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
|
||||
new ZkCloseRegionCoordination.ZkCloseRegionDetails();
|
||||
zkCrd.setPublishStatusInZk(true);
|
||||
zkCrd.setExpectedVersion(versionOfClosingNode);
|
||||
|
||||
CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
|
||||
consensusProvider.getCloseRegionCoordination(), zkCrd);
|
||||
handler.process();
|
||||
// Handler should have transitioned it to RS_ZK_REGION_CLOSED
|
||||
RegionTransition rt = RegionTransition.parseFrom(
|
||||
|
|
Loading…
Reference in New Issue