HBASE-7390 Add extra test cases for assignement on the region server and fix the related issues
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1427064 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8d70de20e0
commit
deeb84ae5a
|
@ -241,7 +241,6 @@ public class HTable implements HTableInterface {
|
|||
|
||||
/**
|
||||
* setup this HTable's parameter based on the passed configuration
|
||||
* @param conf
|
||||
*/
|
||||
private void finishSetup() throws IOException {
|
||||
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
|
||||
|
@ -343,11 +342,10 @@ public class HTable implements HTableInterface {
|
|||
}
|
||||
|
||||
/**
|
||||
* Finds the region on which the given row is being served.
|
||||
* Finds the region on which the given row is being served. Does not reload the cache.
|
||||
* @param row Row to find.
|
||||
* @return Location of the row.
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @deprecated use {@link #getRegionLocation(byte [], boolean)} instead
|
||||
*/
|
||||
public HRegionLocation getRegionLocation(final byte [] row)
|
||||
throws IOException {
|
||||
|
@ -357,8 +355,7 @@ public class HTable implements HTableInterface {
|
|||
/**
|
||||
* Finds the region on which the given row is being served.
|
||||
* @param row Row to find.
|
||||
* @param reload whether or not to reload information or just use cached
|
||||
* information
|
||||
* @param reload true to reload information or false to use cached information
|
||||
* @return Location of the row.
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
|
|
|
@ -162,8 +162,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* that ServerShutdownHandler can be fully enabled and re-assign regions
|
||||
* of dead servers. So that when re-assignment happens, AssignmentManager
|
||||
* has proper region states.
|
||||
*
|
||||
* Protected to ease testing.
|
||||
*/
|
||||
final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
|
||||
protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
|
||||
|
||||
/**
|
||||
* Constructs a new assignment manager.
|
||||
|
@ -1461,6 +1463,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
return;
|
||||
}
|
||||
// This never happens. Currently regionserver close always return true.
|
||||
// Todo; this can now happen (0.96) if there is an exception in a coprocessor
|
||||
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
|
||||
region.getRegionNameAsString());
|
||||
} catch (Throwable t) {
|
||||
|
|
|
@ -52,6 +52,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -213,7 +214,6 @@ import org.cliffc.high_scale_lib.Counter;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
|
@ -240,7 +240,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
//RegionName vs current action in progress
|
||||
//true - if open region action in progress
|
||||
//false - if close region action in progress
|
||||
protected final ConcurrentSkipListMap<byte[], Boolean> regionsInTransitionInRS =
|
||||
protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
|
||||
new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
protected long maxScannerResultSize;
|
||||
|
@ -443,7 +443,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
this.isOnline = false;
|
||||
checkCodecs(this.conf);
|
||||
|
||||
// do we use checksum verfication in the hbase? If hbase checksum verification
|
||||
// do we use checksum verification in the hbase? If hbase checksum verification
|
||||
// is enabled, then we automatically switch off hdfs checksum verification.
|
||||
this.useHBaseChecksum = conf.getBoolean(
|
||||
HConstants.HBASE_CHECKSUM_VERIFICATION, false);
|
||||
|
@ -665,7 +665,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
if (rpcArgClass == null || from.getRequest().isEmpty()) {
|
||||
return HConstants.NORMAL_QOS;
|
||||
}
|
||||
Object deserializedRequestObj = null;
|
||||
Object deserializedRequestObj;
|
||||
//check whether the request has reference to Meta region
|
||||
try {
|
||||
Method parseFrom = methodMap.get("parseFrom").get(rpcArgClass);
|
||||
|
@ -960,7 +960,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
|
||||
//fsOk flag may be changed when closing regions throws exception.
|
||||
if (!this.killed && this.fsOk) {
|
||||
closeWAL(abortRequested ? false : true);
|
||||
closeWAL(!abortRequested);
|
||||
}
|
||||
|
||||
// Make sure the proxy is down.
|
||||
|
@ -1106,7 +1106,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
&& !closedRegions.contains(hri.getEncodedName())) {
|
||||
closedRegions.add(hri.getEncodedName());
|
||||
// Don't update zk with this close transition; pass false.
|
||||
closeRegion(hri, abort, false);
|
||||
closeRegionIgnoreErrors(hri, abort);
|
||||
}
|
||||
}
|
||||
// No regions in RIT, we could stop waiting now.
|
||||
|
@ -1167,7 +1167,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
this.serverNameFromMasterPOV.getHostname());
|
||||
continue;
|
||||
}
|
||||
String value = e.getValue().toString();
|
||||
String value = e.getValue();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Config from master: " + key + "=" + value);
|
||||
}
|
||||
|
@ -1440,7 +1440,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
|
||||
/*
|
||||
* Start maintanence Threads, Server, Worker and lease checker threads.
|
||||
* Start maintenance Threads, Server, Worker and lease checker threads.
|
||||
* Install an UncaughtExceptionHandler that calls abort of RegionServer if we
|
||||
* get an unhandled exception. We cannot set the handler on all threads.
|
||||
* Server's internal Listener thread is off limits. For Server, if an OOME, it
|
||||
|
@ -1892,8 +1892,8 @@ public class HRegionServer implements ClientProtocol,
|
|||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
if (meta != null) closeRegion(meta.getRegionInfo(), abort, false);
|
||||
if (root != null) closeRegion(root.getRegionInfo(), abort, false);
|
||||
if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
|
||||
if (root != null) closeRegionIgnoreErrors(root.getRegionInfo(), abort);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1909,7 +1909,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
HRegion r = e.getValue();
|
||||
if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
|
||||
// Don't update zk with this close transition; pass false.
|
||||
closeRegion(r.getRegionInfo(), abort, false);
|
||||
closeRegionIgnoreErrors(r.getRegionInfo(), abort);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -1984,7 +1984,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
});
|
||||
// Copy over all regions. Regions are sorted by size with biggest first.
|
||||
for (HRegion region : this.onlineRegions.values()) {
|
||||
sortedRegions.put(Long.valueOf(region.memstoreSize.get()), region);
|
||||
sortedRegions.put(region.memstoreSize.get(), region);
|
||||
}
|
||||
return sortedRegions;
|
||||
}
|
||||
|
@ -2106,7 +2106,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
|
||||
|
||||
public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
|
||||
public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
|
||||
return this.regionsInTransitionInRS;
|
||||
}
|
||||
|
||||
|
@ -2267,7 +2267,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
for (HRegion region: regions) {
|
||||
coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
|
||||
}
|
||||
return coprocessors.toArray(new String[0]);
|
||||
return coprocessors.toArray(new String[coprocessors.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2367,70 +2367,103 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
}
|
||||
|
||||
protected void checkIfRegionInTransition(byte[] regionEncodedName,
|
||||
String currentAction) throws RegionAlreadyInTransitionException {
|
||||
if (this.regionsInTransitionInRS.containsKey(regionEncodedName)) {
|
||||
boolean openAction = this.regionsInTransitionInRS.get(regionEncodedName);
|
||||
// The below exception message will be used in master.
|
||||
throw new RegionAlreadyInTransitionException("Received:" + currentAction +
|
||||
" for the region:" + Bytes.toString(regionEncodedName) +
|
||||
" ,which we are already trying to " +
|
||||
(openAction ? OPEN : CLOSE)+ ".");
|
||||
|
||||
/**
|
||||
* Try to close the region, logs a warning on failure but continues.
|
||||
* @param region Region to close
|
||||
*/
|
||||
private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
|
||||
try {
|
||||
if (!closeRegion(region.getEncodedName(), abort, false, -1, null)) {
|
||||
LOG.warn("Failed to close " + region.getRegionNameAsString() +
|
||||
" - ignoring and continuing");
|
||||
}
|
||||
} catch (NotServingRegionException e) {
|
||||
LOG.warn("Failed to close " + region.getRegionNameAsString() +
|
||||
" - ignoring and continuing", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param region Region to close
|
||||
* @param abort True if we are aborting
|
||||
* @param zk True if we are to update zk about the region close; if the close
|
||||
* was orchestrated by master, then update zk. If the close is being run by
|
||||
* the regionserver because its going down, don't update zk.
|
||||
* @return True if closed a region.
|
||||
*/
|
||||
protected boolean closeRegion(HRegionInfo region, final boolean abort,
|
||||
final boolean zk) {
|
||||
return closeRegion(region, abort, zk, -1, null);
|
||||
}
|
||||
* Close asynchronously a region, can be called from the master or internally by the regionserver
|
||||
* when stopping. If called from the master, the region will update the znode status.
|
||||
*
|
||||
* <p>
|
||||
* If an opening was in progress, this method will cancel it, but will not start a new close. The
|
||||
* coprocessors are not called in this case. A NotServingRegionException exception is thrown.
|
||||
* </p>
|
||||
|
||||
/**
|
||||
* @param region Region to close
|
||||
* <p>
|
||||
* If a close was in progress, this new request will be ignored, and an exception thrown.
|
||||
* </p>
|
||||
*
|
||||
* @param encodedName Region to close
|
||||
* @param abort True if we are aborting
|
||||
* @param zk True if we are to update zk about the region close; if the close
|
||||
* was orchestrated by master, then update zk. If the close is being run by
|
||||
* the regionserver because its going down, don't update zk.
|
||||
* @param versionOfClosingNode
|
||||
* the version of znode to compare when RS transitions the znode from
|
||||
* @param versionOfClosingNode the version of znode to compare when RS transitions the znode from
|
||||
* CLOSING state.
|
||||
* @return True if closed a region.
|
||||
* @throws NotServingRegionException if the region is not online or if a close
|
||||
* request in in progress.
|
||||
*/
|
||||
protected boolean closeRegion(HRegionInfo region, final boolean abort,
|
||||
final boolean zk, final int versionOfClosingNode, ServerName sn) {
|
||||
protected boolean closeRegion(String encodedName, final boolean abort,
|
||||
final boolean zk, final int versionOfClosingNode, final ServerName sn)
|
||||
throws NotServingRegionException {
|
||||
//Check for permissions to close.
|
||||
HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
|
||||
final HRegion actualRegion = this.getFromOnlineRegions(encodedName);
|
||||
if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
|
||||
try {
|
||||
actualRegion.getCoprocessorHost().preClose(false);
|
||||
} catch (IOException exp) {
|
||||
LOG.warn("Unable to close region", exp);
|
||||
LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
|
||||
LOG.warn("Received close for region we are already opening or closing; " +
|
||||
region.getEncodedName());
|
||||
return false;
|
||||
final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
|
||||
Boolean.FALSE);
|
||||
|
||||
if (Boolean.TRUE.equals(previous)) {
|
||||
LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
|
||||
"trying to OPEN. Cancelling OPENING.");
|
||||
if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
|
||||
// The replace failed. That should be an exceptional case, but theoretically it can happen.
|
||||
// We're going to try to do a standard close then.
|
||||
LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
|
||||
" Doing a standard close now");
|
||||
return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
|
||||
} else {
|
||||
LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
|
||||
// The master deletes the znode when it receives this exception.
|
||||
throw new NotServingRegionException("The region " + encodedName +
|
||||
" was opening but not yet served. Opening is cancelled.");
|
||||
}
|
||||
} else if (Boolean.FALSE.equals(previous)) {
|
||||
LOG.info("Received CLOSE for the region: " + encodedName +
|
||||
" ,which we are already trying to CLOSE");
|
||||
// The master deletes the znode when it receives this exception.
|
||||
throw new NotServingRegionException("The region " + encodedName +
|
||||
" was already closing. New CLOSE request is ignored.");
|
||||
}
|
||||
this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false);
|
||||
CloseRegionHandler crh = null;
|
||||
if (region.isRootRegion()) {
|
||||
crh = new CloseRootHandler(this, this, region, abort, zk,
|
||||
versionOfClosingNode);
|
||||
} else if (region.isMetaRegion()) {
|
||||
crh = new CloseMetaHandler(this, this, region, abort, zk,
|
||||
versionOfClosingNode);
|
||||
|
||||
if (actualRegion == null){
|
||||
LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
|
||||
this.regionsInTransitionInRS.remove(encodedName.getBytes());
|
||||
// The master deletes the znode when it receives this exception.
|
||||
throw new NotServingRegionException("The region " + encodedName +
|
||||
" is not online, and is not opening.");
|
||||
}
|
||||
|
||||
CloseRegionHandler crh;
|
||||
final HRegionInfo hri = actualRegion.getRegionInfo();
|
||||
if (hri.isRootRegion()) {
|
||||
crh = new CloseRootHandler(this, this, hri, abort, zk, versionOfClosingNode);
|
||||
} else if (hri.isMetaRegion()) {
|
||||
crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode);
|
||||
} else {
|
||||
crh = new CloseRegionHandler(this, this, region, abort, zk, versionOfClosingNode, sn);
|
||||
crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn);
|
||||
}
|
||||
this.service.submit(crh);
|
||||
return true;
|
||||
|
@ -2517,7 +2550,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
protected Throwable cleanup(final Throwable t, final String msg) {
|
||||
// Don't log as error if NSRE; NSRE is 'normal' operation.
|
||||
if (t instanceof NotServingRegionException) {
|
||||
LOG.debug("NotServingRegionException; " + t.getMessage());
|
||||
LOG.debug("NotServingRegionException; " + t.getMessage());
|
||||
return t;
|
||||
}
|
||||
if (msg == null) {
|
||||
|
@ -2682,7 +2715,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
if (region.getCoprocessorHost() != null) {
|
||||
exists = region.getCoprocessorHost().postExists(clientGet, exists);
|
||||
}
|
||||
existence = Boolean.valueOf(exists);
|
||||
existence = exists;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2717,7 +2750,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
Integer lock = null;
|
||||
Integer lock;
|
||||
Result r = null;
|
||||
Boolean processed = null;
|
||||
MutateType type = mutate.getMutateType();
|
||||
|
@ -2750,7 +2783,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
result = region.getCoprocessorHost().postCheckAndPut(row, family,
|
||||
qualifier, compareOp, comparator, put, result);
|
||||
}
|
||||
processed = Boolean.valueOf(result);
|
||||
processed = result;
|
||||
}
|
||||
} else {
|
||||
region.put(put, lock);
|
||||
|
@ -2779,7 +2812,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
result = region.getCoprocessorHost().postCheckAndDelete(row, family,
|
||||
qualifier, compareOp, comparator, delete, result);
|
||||
}
|
||||
processed = Boolean.valueOf(result);
|
||||
processed = result;
|
||||
}
|
||||
} else {
|
||||
region.delete(delete, lock, delete.getWriteToWAL());
|
||||
|
@ -3088,7 +3121,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
region.releaseRowLock(r);
|
||||
this.leases.cancelLease(lockName);
|
||||
LOG.debug("Row lock " + lockId
|
||||
+ " has been explicitly released by client");
|
||||
+ " has been explicitly released by client");
|
||||
return UnlockRowResponse.newBuilder().build();
|
||||
} catch (Throwable t) {
|
||||
throw convertThrowableToIOE(cleanup(t));
|
||||
|
@ -3286,7 +3319,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
try {
|
||||
HRegion region = getRegion(request.getRegion());
|
||||
requestCount.increment();
|
||||
Set<byte[]> columnFamilies = null;
|
||||
Set<byte[]> columnFamilies;
|
||||
if (request.getFamilyCount() == 0) {
|
||||
columnFamilies = region.getStores().keySet();
|
||||
} else {
|
||||
|
@ -3328,8 +3361,24 @@ public class HRegionServer implements ClientProtocol,
|
|||
// Region open/close direct RPCs
|
||||
|
||||
/**
|
||||
* Open a region on the region server.
|
||||
* Open asynchronously a region or a set of regions on the region server.
|
||||
*
|
||||
* The opening is coordinated by ZooKeeper, and this method requires the znode to be created
|
||||
* before being called. As a consequence, this method should be called only from the master.
|
||||
* <p>
|
||||
* Different manages states for the region are:<ul>
|
||||
* <li>region not opened: the region opening will start asynchronously.</li>
|
||||
* <li>a close is already in progress: this is considered as an error.</li>
|
||||
* <li>an open is already in progress: this new open request will be ignored. This is important
|
||||
* because the Master can do multiple requests if it crashes.</li>
|
||||
* <li>the region is already opened: this new open request will be ignored./li>
|
||||
* </ul>
|
||||
* </p>
|
||||
* <p>
|
||||
* Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
|
||||
* For a single region opening, errors are sent through a ServiceException. For bulk assign,
|
||||
* errors are put in the response as FAILED_OPENING.
|
||||
* </p>
|
||||
* @param controller the RPC controller
|
||||
* @param request the request
|
||||
* @throws ServiceException
|
||||
|
@ -3345,22 +3394,22 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
requestCount.increment();
|
||||
OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
|
||||
int regionCount = request.getOpenInfoCount();
|
||||
Map<String, HTableDescriptor> htds =
|
||||
new HashMap<String, HTableDescriptor>(regionCount);
|
||||
boolean isBulkAssign = regionCount > 1;
|
||||
final int regionCount = request.getOpenInfoCount();
|
||||
final Map<String, HTableDescriptor> htds = new HashMap<String, HTableDescriptor>(regionCount);
|
||||
final boolean isBulkAssign = regionCount > 1;
|
||||
for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
|
||||
HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
|
||||
final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
|
||||
|
||||
int versionOfOfflineNode = -1;
|
||||
if (regionOpenInfo.hasVersionOfOfflineNode()) {
|
||||
versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
|
||||
}
|
||||
HTableDescriptor htd;
|
||||
try {
|
||||
checkIfRegionInTransition(region.getEncodedNameAsBytes(), OPEN);
|
||||
HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
|
||||
if (null != onlineRegion) {
|
||||
final HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
|
||||
if (onlineRegion != null) {
|
||||
//Check if the region can actually be opened.
|
||||
if( onlineRegion.getCoprocessorHost() != null){
|
||||
if (onlineRegion.getCoprocessorHost() != null) {
|
||||
onlineRegion.getCoprocessorHost().preOpen();
|
||||
}
|
||||
// See HBASE-5094. Cross check with META if still this RS is owning
|
||||
|
@ -3373,39 +3422,55 @@ public class HRegionServer implements ClientProtocol,
|
|||
builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
|
||||
continue;
|
||||
} else {
|
||||
LOG.warn("The region " + region.getEncodedName()
|
||||
+ " is online on this server but META does not have this server.");
|
||||
LOG.warn("The region " + region.getEncodedName() + " is online on this server" +
|
||||
" but META does not have this server - continue opening.");
|
||||
removeFromOnlineRegions(region.getEncodedName(), null);
|
||||
}
|
||||
}
|
||||
LOG.info("Received request to open region: " + region.getRegionNameAsString() + " on "
|
||||
+ this.serverNameFromMasterPOV);
|
||||
HTableDescriptor htd = htds.get(region.getTableNameAsString());
|
||||
htd = htds.get(region.getTableNameAsString());
|
||||
if (htd == null) {
|
||||
htd = this.tableDescriptors.get(region.getTableName());
|
||||
htds.put(region.getTableNameAsString(), htd);
|
||||
}
|
||||
this.regionsInTransitionInRS.putIfAbsent(
|
||||
region.getEncodedNameAsBytes(), true);
|
||||
// Need to pass the expected version in the constructor.
|
||||
if (region.isRootRegion()) {
|
||||
this.service.submit(new OpenRootHandler(this, this, region, htd,
|
||||
versionOfOfflineNode));
|
||||
} else if (region.isMetaRegion()) {
|
||||
this.service.submit(new OpenMetaHandler(this, this, region, htd,
|
||||
versionOfOfflineNode));
|
||||
} else {
|
||||
this.service.submit(new OpenRegionHandler(this, this, region, htd,
|
||||
versionOfOfflineNode));
|
||||
|
||||
final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(
|
||||
region.getEncodedNameAsBytes(), Boolean.TRUE);
|
||||
|
||||
if (Boolean.FALSE.equals(previous)) {
|
||||
// There is a close in progress. We need to mark this open as failed in ZK.
|
||||
OpenRegionHandler.
|
||||
tryTransitionFromOfflineToFailedOpen(this, region, versionOfOfflineNode);
|
||||
|
||||
throw new RegionAlreadyInTransitionException("Received OPEN for the region:" +
|
||||
region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
|
||||
}
|
||||
|
||||
if (Boolean.TRUE.equals(previous)) {
|
||||
// An open is in progress. This is supported, but let's log this.
|
||||
LOG.info("Receiving OPEN for the region:" +
|
||||
region.getRegionNameAsString() + " , which we are already trying to OPEN" +
|
||||
" - ignoring this new request for this region.");
|
||||
}
|
||||
|
||||
if (previous == null) {
|
||||
// If there is no action in progress, we can submit a specific handler.
|
||||
// Need to pass the expected version in the constructor.
|
||||
if (region.isRootRegion()) {
|
||||
this.service.submit(new OpenRootHandler(this, this, region, htd,
|
||||
versionOfOfflineNode));
|
||||
} else if (region.isMetaRegion()) {
|
||||
this.service.submit(new OpenMetaHandler(this, this, region, htd,
|
||||
versionOfOfflineNode));
|
||||
} else {
|
||||
this.service.submit(new OpenRegionHandler(this, this, region, htd,
|
||||
versionOfOfflineNode));
|
||||
}
|
||||
}
|
||||
|
||||
builder.addOpeningState(RegionOpeningState.OPENED);
|
||||
} catch (RegionAlreadyInTransitionException rie) {
|
||||
LOG.warn("Region is already in transition", rie);
|
||||
if (isBulkAssign) {
|
||||
builder.addOpeningState(RegionOpeningState.OPENED);
|
||||
} else {
|
||||
throw new ServiceException(rie);
|
||||
}
|
||||
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
|
||||
if (isBulkAssign) {
|
||||
|
@ -3415,6 +3480,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
@ -3439,31 +3505,22 @@ public class HRegionServer implements ClientProtocol,
|
|||
|
||||
try {
|
||||
checkOpen();
|
||||
String encodedRegionName =
|
||||
ProtobufUtil.getRegionEncodedName(request.getRegion());
|
||||
byte[] encodedName = Bytes.toBytes(encodedRegionName);
|
||||
HRegion region = getRegionByEncodedName(encodedRegionName);
|
||||
if(region.getCoprocessorHost() != null){
|
||||
final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
|
||||
|
||||
// Can be null if we're calling close on a region that's not online
|
||||
final HRegion region = this.getFromOnlineRegions(encodedRegionName);
|
||||
if ((region != null) && (region .getCoprocessorHost() != null)) {
|
||||
region.getCoprocessorHost().preClose(false);
|
||||
}
|
||||
Boolean openAction = regionsInTransitionInRS.get(encodedName);
|
||||
if (openAction != null) {
|
||||
if (openAction.booleanValue()) {
|
||||
regionsInTransitionInRS.replace(encodedName, openAction, Boolean.FALSE);
|
||||
}
|
||||
checkIfRegionInTransition(encodedName, CLOSE);
|
||||
}
|
||||
|
||||
requestCount.increment();
|
||||
LOG.info("Received close region: " + region.getRegionNameAsString() +
|
||||
". Version of ZK closing node:" + versionOfClosingNode +
|
||||
LOG.info("Received close region: " + encodedRegionName +
|
||||
"Transitioning in ZK: " + (zk ? "yes" : "no") +
|
||||
". Version of ZK closing node:" + versionOfClosingNode +
|
||||
". Destination server:" + sn);
|
||||
HRegionInfo regionInfo = region.getRegionInfo();
|
||||
checkIfRegionInTransition(encodedName, CLOSE);
|
||||
boolean closed = closeRegion(
|
||||
regionInfo, false, zk, versionOfClosingNode, sn);
|
||||
CloseRegionResponse.Builder builder =
|
||||
CloseRegionResponse.newBuilder().setClosed(closed);
|
||||
|
||||
boolean closed = closeRegion(encodedRegionName, false, zk, versionOfClosingNode, sn);
|
||||
CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
|
||||
return builder.build();
|
||||
} catch (IOException ie) {
|
||||
throw new ServiceException(ie);
|
||||
|
@ -3554,7 +3611,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
store = region.getStore(family);
|
||||
if (store == null) {
|
||||
throw new ServiceException(new IOException("column family " + Bytes.toString(family) +
|
||||
" does not exist in region " + new String(region.getRegionNameAsString())));
|
||||
" does not exist in region " + region.getRegionNameAsString()));
|
||||
}
|
||||
}
|
||||
if (request.hasMajor()) {
|
||||
|
@ -3681,7 +3738,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
protected HRegion getRegion(
|
||||
final RegionSpecifier regionSpecifier) throws IOException {
|
||||
return getRegionByEncodedName(
|
||||
ProtobufUtil.getRegionEncodedName(regionSpecifier));
|
||||
ProtobufUtil.getRegionEncodedName(regionSpecifier));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3757,7 +3814,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
ActionResult result = resultBuilder.build();
|
||||
int i = 0;
|
||||
for (Mutate m : mutates) {
|
||||
Mutation mutation = null;
|
||||
Mutation mutation;
|
||||
if (m.getMutateType() == MutateType.PUT) {
|
||||
mutation = ProtobufUtil.toPut(m);
|
||||
batchContainsPuts = true;
|
||||
|
@ -3850,10 +3907,10 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
|
||||
|
||||
// This map will containsall the regions that we closed for a move.
|
||||
// This map will contains all the regions that we closed for a move.
|
||||
// We add the time it was moved as we don't want to keep too old information
|
||||
protected Map<String, Pair<Long, ServerName>> movedRegions =
|
||||
new ConcurrentHashMap<String, Pair<Long, ServerName>>(3000);
|
||||
new ConcurrentHashMap<String, Pair<Long, ServerName>>(3000);
|
||||
|
||||
// We need a timeout. If not there is a risk of giving a wrong information: this would double
|
||||
// the number of network calls instead of reducing them.
|
||||
|
@ -3867,8 +3924,8 @@ public class HRegionServer implements ClientProtocol,
|
|||
final Long time = System.currentTimeMillis();
|
||||
|
||||
movedRegions.put(
|
||||
encodedName,
|
||||
new Pair<Long, ServerName>(time, destination));
|
||||
encodedName,
|
||||
new Pair<Long, ServerName>(time, destination));
|
||||
}
|
||||
|
||||
private ServerName getMovedRegion(final String encodedRegionName) {
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -79,7 +79,7 @@ public interface RegionServerServices extends OnlineRegions {
|
|||
* Get the regions that are currently being opened or closed in the RS
|
||||
* @return map of regions in transition in this RS
|
||||
*/
|
||||
public Map<byte[], Boolean> getRegionsInTransitionInRS();
|
||||
public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS();
|
||||
|
||||
/**
|
||||
* @return Return the FileSystem object used by the regionserver
|
||||
|
|
|
@ -126,13 +126,21 @@ public class CloseRegionHandler extends EventHandler {
|
|||
// Check that this region is being served here
|
||||
HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName);
|
||||
if (region == null) {
|
||||
LOG.warn("Received CLOSE for region " + name +
|
||||
" but currently not serving");
|
||||
LOG.warn("Received CLOSE for region " + name + " but currently not serving - ignoring");
|
||||
if (zk){
|
||||
LOG.error("The znode is not modified as we are not serving " + name);
|
||||
}
|
||||
// TODO: do better than a simple warning
|
||||
return;
|
||||
}
|
||||
|
||||
// Close the region
|
||||
try {
|
||||
if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)){
|
||||
// bad znode state
|
||||
return; // We're node deleting the znode, but it's not ours...
|
||||
}
|
||||
|
||||
// TODO: If we need to keep updating CLOSING stamp to prevent against
|
||||
// a timeout if this is long-running, need to spin up a thread?
|
||||
if (region.close(abort) == null) {
|
||||
|
|
|
@ -85,30 +85,50 @@ public class OpenRegionHandler extends EventHandler {
|
|||
|
||||
@Override
|
||||
public void process() throws IOException {
|
||||
boolean openSuccessful = false;
|
||||
final String regionName = regionInfo.getRegionNameAsString();
|
||||
|
||||
try {
|
||||
final String name = regionInfo.getRegionNameAsString();
|
||||
if (this.server.isStopped() || this.rsServices.isStopping()) {
|
||||
return;
|
||||
}
|
||||
final String encodedName = regionInfo.getEncodedName();
|
||||
|
||||
// Check that this region is not already online
|
||||
HRegion region = this.rsServices.getFromOnlineRegions(encodedName);
|
||||
// 3 different difficult situations can occur
|
||||
// 1) The opening was cancelled. This is an expected situation
|
||||
// 2) The region was hijacked, we no longer have the znode
|
||||
// 3) The region is now marked as online while we're suppose to open. This would be a bug.
|
||||
|
||||
// Check that this region is not already online
|
||||
if (this.rsServices.getFromOnlineRegions(encodedName) != null) {
|
||||
LOG.error("Region " + encodedName +
|
||||
" was already online when we started processing the opening. " +
|
||||
"Marking this new attempt as failed");
|
||||
tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, this.version);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check that we're still supposed to open the region and transition.
|
||||
// If fails, just return. Someone stole the region from under us.
|
||||
// Calling transitionZookeeperOfflineToOpening initalizes this.version.
|
||||
if (!transitionZookeeperOfflineToOpening(encodedName,
|
||||
versionOfOfflineNode)) {
|
||||
LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
|
||||
encodedName);
|
||||
// Calling transitionZookeeperOfflineToOpening initializes this.version.
|
||||
if (!isRegionStillOpening()){
|
||||
LOG.error("Region " + encodedName + " opening cancelled");
|
||||
tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, this.version);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) {
|
||||
LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
|
||||
// This is a desperate attempt: the znode is unlikely to be ours. But we can't do more.
|
||||
tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, this.version);
|
||||
return;
|
||||
}
|
||||
|
||||
// Open region. After a successful open, failures in subsequent
|
||||
// processing needs to do a close as part of cleanup.
|
||||
region = openRegion();
|
||||
HRegion region = openRegion();
|
||||
if (region == null) {
|
||||
tryTransitionToFailedOpen(regionInfo);
|
||||
tryTransitionFromOpeningToFailedOpen(regionInfo);
|
||||
return;
|
||||
}
|
||||
boolean failed = true;
|
||||
|
@ -120,37 +140,63 @@ public class OpenRegionHandler extends EventHandler {
|
|||
if (failed || this.server.isStopped() ||
|
||||
this.rsServices.isStopping()) {
|
||||
cleanupFailedOpen(region);
|
||||
tryTransitionToFailedOpen(regionInfo);
|
||||
tryTransitionFromOpeningToFailedOpen(regionInfo);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!transitionToOpened(region)) {
|
||||
|
||||
if (!isRegionStillOpening() || !transitionToOpened(region)) {
|
||||
// If we fail to transition to opened, it's because of one of two cases:
|
||||
// (a) we lost our ZK lease
|
||||
// OR (b) someone else opened the region before us
|
||||
// In either case, we don't need to transition to FAILED_OPEN state.
|
||||
// In case (a), the Master will process us as a dead server. In case
|
||||
// (b) the region is already being handled elsewhere anyway.
|
||||
// OR (c) someone cancelled the open
|
||||
// In all cases, we try to transition to failed_open to be safe.
|
||||
cleanupFailedOpen(region);
|
||||
tryTransitionFromOpeningToFailedOpen(regionInfo);
|
||||
return;
|
||||
}
|
||||
|
||||
// One more check to make sure we are opening instead of closing
|
||||
if (!isRegionStillOpening()) {
|
||||
LOG.warn("Open region aborted since it isn't opening any more");
|
||||
cleanupFailedOpen(region);
|
||||
return;
|
||||
}
|
||||
// We have a znode in the opened state now. We can't really delete it as the master job.
|
||||
// Transitioning to failed open would create a race condition if the master has already
|
||||
// acted the transition to opened.
|
||||
// Cancelling the open is dangerous, because we would have a state where the master thinks
|
||||
// the region is opened while the region is actually closed. It is a dangerous state
|
||||
// to be in. For this reason, from now on, we're not going back. There is a message in the
|
||||
// finally close to let the admin knows where we stand.
|
||||
|
||||
|
||||
// Successful region open, and add it to OnlineRegions
|
||||
this.rsServices.addToOnlineRegions(region);
|
||||
openSuccessful = true;
|
||||
|
||||
// Done! Successful region open
|
||||
LOG.debug("Opened " + name + " on server:" +
|
||||
LOG.debug("Opened " + regionName + " on server:" +
|
||||
this.server.getServerName());
|
||||
|
||||
|
||||
} finally {
|
||||
this.rsServices.getRegionsInTransitionInRS().
|
||||
final Boolean current = this.rsServices.getRegionsInTransitionInRS().
|
||||
remove(this.regionInfo.getEncodedNameAsBytes());
|
||||
|
||||
// Let's check if we have met a race condition on open cancellation....
|
||||
// A better solution would be to not have any race condition.
|
||||
// this.rsServices.getRegionsInTransitionInRS().remove(
|
||||
// this.regionInfo.getEncodedNameAsBytes(), Boolean.TRUE);
|
||||
// would help, but we would still have a consistency issue to manage with
|
||||
// 1) this.rsServices.addToOnlineRegions(region);
|
||||
// 2) the ZK state.
|
||||
if (openSuccessful) {
|
||||
if (current == null) { // Should NEVER happen, but let's be paranoid.
|
||||
LOG.error("Bad state: we've just opened a region that was NOT in transition. Region=" +
|
||||
regionName
|
||||
);
|
||||
} else if (Boolean.FALSE.equals(current)) { // Can happen, if we're really unlucky.
|
||||
LOG.error("Race condition: we've finished to open a region, while a close was requested "
|
||||
+ " on region=" + regionName + ". It can be a critical error, as a region that" +
|
||||
" should be closed is now opened."
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -226,7 +272,8 @@ public class OpenRegionHandler extends EventHandler {
|
|||
/**
|
||||
* Thread to run region post open tasks. Call {@link #getException()} after
|
||||
* the thread finishes to check for exceptions running
|
||||
* {@link RegionServerServices#postOpenDeployTasks(HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker, boolean)}
|
||||
* {@link RegionServerServices#postOpenDeployTasks(
|
||||
* HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker, boolean)}
|
||||
* .
|
||||
*/
|
||||
static class PostOpenDeployTasksThread extends Thread {
|
||||
|
@ -277,10 +324,6 @@ public class OpenRegionHandler extends EventHandler {
|
|||
* @throws IOException
|
||||
*/
|
||||
private boolean transitionToOpened(final HRegion r) throws IOException {
|
||||
if (!isRegionStillOpening()) {
|
||||
LOG.warn("Open region aborted since it isn't opening any more");
|
||||
return false;
|
||||
}
|
||||
boolean result = false;
|
||||
HRegionInfo hri = r.getRegionInfo();
|
||||
final String name = hri.getRegionNameAsString();
|
||||
|
@ -310,11 +353,12 @@ public class OpenRegionHandler extends EventHandler {
|
|||
* @param hri Region we're working on.
|
||||
* @return whether znode is successfully transitioned to FAILED_OPEN state.
|
||||
*/
|
||||
private boolean tryTransitionToFailedOpen(final HRegionInfo hri) {
|
||||
private boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri) {
|
||||
boolean result = false;
|
||||
final String name = hri.getRegionNameAsString();
|
||||
try {
|
||||
LOG.info("Opening of region " + hri + " failed, marking as FAILED_OPEN in ZK");
|
||||
LOG.info("Opening of region " + hri + " failed, transitioning" +
|
||||
" from OPENING to FAILED_OPEN in ZK, expecting version " + this.version);
|
||||
if (ZKAssign.transitionNode(
|
||||
this.server.getZooKeeper(), hri,
|
||||
this.server.getServerName(),
|
||||
|
@ -334,6 +378,43 @@ public class OpenRegionHandler extends EventHandler {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to transition to open. This function is static to make it usable before creating the
|
||||
* handler.
|
||||
*
|
||||
* This is not guaranteed to succeed, we just do our best.
|
||||
*
|
||||
* @param rsServices
|
||||
* @param hri Region we're working on.
|
||||
* @param versionOfOfflineNode version to checked.
|
||||
* @return whether znode is successfully transitioned to FAILED_OPEN state.
|
||||
*/
|
||||
public static boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
|
||||
final HRegionInfo hri, final int versionOfOfflineNode) {
|
||||
boolean result = false;
|
||||
final String name = hri.getRegionNameAsString();
|
||||
try {
|
||||
LOG.info("Opening of region " + hri + " failed, transitioning" +
|
||||
" from OFFLINE to FAILED_OPEN in ZK, expecting version " + versionOfOfflineNode);
|
||||
if (ZKAssign.transitionNode(
|
||||
rsServices.getZooKeeper(), hri,
|
||||
rsServices.getServerName(),
|
||||
EventType.M_ZK_REGION_OFFLINE,
|
||||
EventType.RS_ZK_REGION_FAILED_OPEN,
|
||||
versionOfOfflineNode) == -1) {
|
||||
LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
|
||||
"It's likely that the master already timed out this open " +
|
||||
"attempt, and thus another RS already has the region.");
|
||||
} else {
|
||||
result = true;
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return Instance of HRegion if successful open else null.
|
||||
*/
|
||||
|
@ -379,7 +460,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
private boolean isRegionStillOpening() {
|
||||
byte[] encodedName = regionInfo.getEncodedNameAsBytes();
|
||||
Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
|
||||
return action != null && action.booleanValue();
|
||||
return Boolean.TRUE.equals(action); // true means opening for RIT
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -392,10 +473,6 @@ public class OpenRegionHandler extends EventHandler {
|
|||
*/
|
||||
boolean transitionZookeeperOfflineToOpening(final String encodedName,
|
||||
int versionOfOfflineNode) {
|
||||
if (!isRegionStillOpening()) {
|
||||
LOG.warn("Open region aborted since it isn't opening any more");
|
||||
return false;
|
||||
}
|
||||
// TODO: should also handle transition from CLOSED?
|
||||
try {
|
||||
// Initialize the znode version.
|
||||
|
@ -405,6 +482,8 @@ public class OpenRegionHandler extends EventHandler {
|
|||
} catch (KeeperException e) {
|
||||
LOG.error("Error transition from OFFLINE to OPENING for region=" +
|
||||
encodedName, e);
|
||||
this.version = -1;
|
||||
return false;
|
||||
}
|
||||
boolean b = isGoodVersion();
|
||||
if (!b) {
|
||||
|
@ -436,6 +515,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
server.abort("Exception refreshing OPENING; region=" + encodedName +
|
||||
", context=" + context, e);
|
||||
this.version = -1;
|
||||
return false;
|
||||
}
|
||||
boolean b = isGoodVersion();
|
||||
if (!b) {
|
||||
|
|
|
@ -176,7 +176,8 @@ public class ZKAssign {
|
|||
LOG.debug(zkw.prefix("Async create of unassigned node for " +
|
||||
region.getEncodedName() + " with OFFLINE state"));
|
||||
RegionTransition rt =
|
||||
RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
|
||||
RegionTransition.createRegionTransition(
|
||||
EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
|
||||
String node = getNodeName(zkw, region.getEncodedName());
|
||||
ZKUtil.asyncCreate(zkw, node, rt.toByteArray(), cb, ctx);
|
||||
}
|
||||
|
@ -258,14 +259,14 @@ public class ZKAssign {
|
|||
* of the specified regions transition.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param regionName opened region to be deleted from zk
|
||||
* @param encodedRegionName opened region to be deleted from zk
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
* @throws KeeperException.NoNodeException if node does not exist
|
||||
*/
|
||||
public static boolean deleteOpenedNode(ZooKeeperWatcher zkw,
|
||||
String regionName)
|
||||
String encodedRegionName)
|
||||
throws KeeperException, KeeperException.NoNodeException {
|
||||
return deleteNode(zkw, regionName, EventType.RS_ZK_REGION_OPENED);
|
||||
return deleteNode(zkw, encodedRegionName, EventType.RS_ZK_REGION_OPENED);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -283,14 +284,14 @@ public class ZKAssign {
|
|||
* that has died are all set to OFFLINE before being processed.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param regionName closed region to be deleted from zk
|
||||
* @param encodedRegionName closed region to be deleted from zk
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
* @throws KeeperException.NoNodeException if node does not exist
|
||||
*/
|
||||
public static boolean deleteOfflineNode(ZooKeeperWatcher zkw,
|
||||
String regionName)
|
||||
String encodedRegionName)
|
||||
throws KeeperException, KeeperException.NoNodeException {
|
||||
return deleteNode(zkw, regionName, EventType.M_ZK_REGION_OFFLINE);
|
||||
return deleteNode(zkw, encodedRegionName, EventType.M_ZK_REGION_OFFLINE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -309,14 +310,14 @@ public class ZKAssign {
|
|||
* of the specified regions transition to being closed.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param regionName closed region to be deleted from zk
|
||||
* @param encodedRegionName closed region to be deleted from zk
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
* @throws KeeperException.NoNodeException if node does not exist
|
||||
*/
|
||||
public static boolean deleteClosedNode(ZooKeeperWatcher zkw,
|
||||
String regionName)
|
||||
String encodedRegionName)
|
||||
throws KeeperException, KeeperException.NoNodeException {
|
||||
return deleteNode(zkw, regionName, EventType.RS_ZK_REGION_CLOSED);
|
||||
return deleteNode(zkw, encodedRegionName, EventType.RS_ZK_REGION_CLOSED);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -342,8 +343,8 @@ public class ZKAssign {
|
|||
public static boolean deleteClosingNode(ZooKeeperWatcher zkw,
|
||||
HRegionInfo region)
|
||||
throws KeeperException, KeeperException.NoNodeException {
|
||||
String regionName = region.getEncodedName();
|
||||
return deleteNode(zkw, regionName, EventType.M_ZK_REGION_CLOSING);
|
||||
String encodedRegionName = region.getEncodedName();
|
||||
return deleteNode(zkw, encodedRegionName, EventType.M_ZK_REGION_CLOSING);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -362,15 +363,15 @@ public class ZKAssign {
|
|||
* of the specified regions transition to being closed/opened.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param regionName region to be deleted from zk
|
||||
* @param encodedRegionName region to be deleted from zk
|
||||
* @param expectedState state region must be in for delete to complete
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
* @throws KeeperException.NoNodeException if node does not exist
|
||||
*/
|
||||
public static boolean deleteNode(ZooKeeperWatcher zkw, String regionName,
|
||||
public static boolean deleteNode(ZooKeeperWatcher zkw, String encodedRegionName,
|
||||
EventType expectedState)
|
||||
throws KeeperException, KeeperException.NoNodeException {
|
||||
return deleteNode(zkw, regionName, expectedState, -1);
|
||||
return deleteNode(zkw, encodedRegionName, expectedState, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -389,7 +390,7 @@ public class ZKAssign {
|
|||
* of the specified regions transition to being closed/opened.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param regionName region to be deleted from zk
|
||||
* @param encodedRegionName region to be deleted from zk
|
||||
* @param expectedState state region must be in for delete to complete
|
||||
* @param expectedVersion of the znode that is to be deleted.
|
||||
* If expectedVersion need not be compared while deleting the znode
|
||||
|
@ -397,12 +398,12 @@ public class ZKAssign {
|
|||
* @throws KeeperException if unexpected zookeeper exception
|
||||
* @throws KeeperException.NoNodeException if node does not exist
|
||||
*/
|
||||
public static boolean deleteNode(ZooKeeperWatcher zkw, String regionName,
|
||||
public static boolean deleteNode(ZooKeeperWatcher zkw, String encodedRegionName,
|
||||
EventType expectedState, int expectedVersion)
|
||||
throws KeeperException, KeeperException.NoNodeException {
|
||||
LOG.debug(zkw.prefix("Deleting existing unassigned " +
|
||||
"node for " + regionName + " that is in expected state " + expectedState));
|
||||
String node = getNodeName(zkw, regionName);
|
||||
"node for " + encodedRegionName + " that is in expected state " + expectedState));
|
||||
String node = getNodeName(zkw, encodedRegionName);
|
||||
zkw.sync(node);
|
||||
Stat stat = new Stat();
|
||||
byte [] bytes = ZKUtil.getDataNoWatch(zkw, node, stat);
|
||||
|
@ -413,24 +414,24 @@ public class ZKAssign {
|
|||
RegionTransition rt = getRegionTransition(bytes);
|
||||
EventType et = rt.getEventType();
|
||||
if (!et.equals(expectedState)) {
|
||||
LOG.warn(zkw.prefix("Attempting to delete unassigned node " + regionName + " in " +
|
||||
LOG.warn(zkw.prefix("Attempting to delete unassigned node " + encodedRegionName + " in " +
|
||||
expectedState + " state but node is in " + et + " state"));
|
||||
return false;
|
||||
}
|
||||
if (expectedVersion != -1
|
||||
&& stat.getVersion() != expectedVersion) {
|
||||
LOG.warn("The node " + regionName + " we are trying to delete is not" +
|
||||
LOG.warn("The node " + encodedRegionName + " we are trying to delete is not" +
|
||||
" the expected one. Got a version mismatch");
|
||||
return false;
|
||||
}
|
||||
if(!ZKUtil.deleteNode(zkw, node, stat.getVersion())) {
|
||||
LOG.warn(zkw.prefix("Attempting to delete " +
|
||||
"unassigned node " + regionName + " in " + expectedState +
|
||||
"unassigned node " + encodedRegionName + " in " + expectedState +
|
||||
" state but after verifying state, we got a version mismatch"));
|
||||
return false;
|
||||
}
|
||||
LOG.debug(zkw.prefix("Successfully deleted unassigned node for region " +
|
||||
regionName + " in expected state " + expectedState));
|
||||
encodedRegionName + " in expected state " + expectedState));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -630,6 +631,51 @@ public class ZKAssign {
|
|||
EventType.RS_ZK_REGION_OPENED, expectedVersion);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param region region to be closed
|
||||
* @param expectedVersion expected version of the znode
|
||||
* @return true if the znode exists, has the right version and the right state. False otherwise.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static boolean checkClosingState(ZooKeeperWatcher zkw, HRegionInfo region,
|
||||
int expectedVersion) throws KeeperException {
|
||||
|
||||
final String encoded = getNodeName(zkw, region.getEncodedName());
|
||||
zkw.sync(encoded);
|
||||
|
||||
// Read existing data of the node
|
||||
Stat stat = new Stat();
|
||||
byte[] existingBytes = ZKUtil.getDataNoWatch(zkw, encoded, stat);
|
||||
|
||||
if (existingBytes == null) {
|
||||
LOG.warn(zkw.prefix("Attempt to check the " +
|
||||
"closing node for " + encoded +
|
||||
". The node does not exist"));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (expectedVersion != -1 && stat.getVersion() != expectedVersion) {
|
||||
LOG.warn(zkw.prefix("Attempt to check the " +
|
||||
"closing node for " + encoded +
|
||||
". The node existed but was version " + stat.getVersion() +
|
||||
" not the expected version " + expectedVersion));
|
||||
return false;
|
||||
}
|
||||
|
||||
RegionTransition rt = getRegionTransition(existingBytes);
|
||||
|
||||
if (!EventType.M_ZK_REGION_CLOSING.equals(rt.getEventType())) {
|
||||
LOG.warn(zkw.prefix("Attempt to check the " +
|
||||
"closing node for " + encoded +
|
||||
". The node existed but was in an unexpected state: " + rt.getEventType()));
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that actually performs unassigned node transitions.
|
||||
*
|
||||
|
@ -722,7 +768,8 @@ public class ZKAssign {
|
|||
|
||||
// Write new data, ensuring data has not changed since we last read it
|
||||
try {
|
||||
rt = RegionTransition.createRegionTransition(endState, region.getRegionName(), serverName, payload);
|
||||
rt = RegionTransition.createRegionTransition(
|
||||
endState, region.getRegionName(), serverName, payload);
|
||||
if(!ZKUtil.setData(zkw, node, rt.toByteArray(), stat.getVersion())) {
|
||||
LOG.warn(zkw.prefix("Attempt to transition the " +
|
||||
"unassigned node for " + encoded +
|
||||
|
|
|
@ -1035,7 +1035,13 @@ public class ZKUtil {
|
|||
waitForZKConnectionIfAuthenticating(zkw);
|
||||
zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
|
||||
CreateMode.PERSISTENT);
|
||||
return zkw.getRecoverableZooKeeper().exists(znode, zkw).getVersion();
|
||||
Stat stat = zkw.getRecoverableZooKeeper().exists(znode, zkw);
|
||||
if (stat == null){
|
||||
// Likely a race condition. Someone deleted the znode.
|
||||
throw KeeperException.create(KeeperException.Code.SYSTEMERROR,
|
||||
"ZK.exists returned null (i.e.: znode does not exist) for znode=" + znode);
|
||||
}
|
||||
return stat.getVersion();
|
||||
} catch (InterruptedException e) {
|
||||
zkw.interruptedException(e);
|
||||
return -1;
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -324,7 +325,7 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<byte[], Boolean> getRegionsInTransitionInRS() {
|
||||
public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,356 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
import junit.framework.Assert;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
||||
/**
|
||||
* Tests on the region server, without the master.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestRegionServerNoMaster {
|
||||
|
||||
private static final int NB_SERVERS = 1;
|
||||
private static HTable table;
|
||||
private static final byte[] row = "ee".getBytes();
|
||||
|
||||
private static HRegionInfo hri;
|
||||
|
||||
private static byte[] regionName;
|
||||
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void before() throws Exception {
|
||||
HTU.startMiniCluster(NB_SERVERS);
|
||||
final byte[] tableName = Bytes.toBytes(TestRegionServerNoMaster.class.getName());
|
||||
|
||||
// Create table then get the single region for our new table.
|
||||
table = HTU.createTable(tableName, HConstants.CATALOG_FAMILY);
|
||||
Put p = new Put(row);
|
||||
p.add(HConstants.CATALOG_FAMILY, row, row);
|
||||
table.put(p);
|
||||
|
||||
hri = table.getRegionLocation(row, false).getRegionInfo();
|
||||
regionName = hri.getRegionName();
|
||||
|
||||
// No master
|
||||
HTU.getHBaseCluster().getMaster().stopMaster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
table.close();
|
||||
HTU.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws Exception {
|
||||
// Clean the state if the test failed before cleaning the znode
|
||||
// It does not manage all bad failures, so if there are multiple failures, only
|
||||
// the first one should be looked at.
|
||||
ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hri);
|
||||
}
|
||||
|
||||
|
||||
private static HRegionServer getRS() {
|
||||
return HTU.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reopen the region. Reused in multiple tests as we always leave the region open after a test.
|
||||
*/
|
||||
private void reopenRegion() throws Exception {
|
||||
// We reopen. We need a ZK node here, as a open is always triggered by a master.
|
||||
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
|
||||
// first version is '0'
|
||||
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(hri, 0);
|
||||
AdminProtos.OpenRegionResponse responseOpen = getRS().openRegion(null, orr);
|
||||
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
|
||||
Assert.assertTrue(responseOpen.getOpeningState(0).
|
||||
equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED));
|
||||
|
||||
|
||||
checkRegionIsOpened();
|
||||
}
|
||||
|
||||
private void checkRegionIsOpened() throws Exception {
|
||||
|
||||
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
|
||||
Assert.assertTrue(getRS().getRegion(regionName).isAvailable());
|
||||
|
||||
Assert.assertTrue(
|
||||
ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName()));
|
||||
}
|
||||
|
||||
|
||||
private void checkRegionIsClosed() throws Exception {
|
||||
|
||||
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
|
||||
try {
|
||||
Assert.assertFalse(getRS().getRegion(regionName).isAvailable());
|
||||
} catch (NotServingRegionException expected) {
|
||||
// That's how it work: if the region is closed we have an exception.
|
||||
}
|
||||
|
||||
// We don't delete the znode here, because there is not always a znode.
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Close the region without using ZK
|
||||
*/
|
||||
private void closeNoZK() throws Exception {
|
||||
// no transition in ZK
|
||||
AdminProtos.CloseRegionRequest crr =
|
||||
RequestConverter.buildCloseRegionRequest(regionName, false);
|
||||
AdminProtos.CloseRegionResponse responseClose = getRS().closeRegion(null, crr);
|
||||
Assert.assertTrue(responseClose.getClosed());
|
||||
|
||||
// now waiting & checking. After a while, the transition should be done and the region closed
|
||||
checkRegionIsClosed();
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testCloseByRegionServer() throws Exception {
|
||||
closeNoZK();
|
||||
reopenRegion();
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testCloseByMasterWithoutZNode() throws Exception {
|
||||
|
||||
// Transition in ZK on. This should fail, as there is no znode
|
||||
AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
|
||||
regionName, true);
|
||||
AdminProtos.CloseRegionResponse responseClose = getRS().closeRegion(null, crr);
|
||||
Assert.assertTrue(responseClose.getClosed());
|
||||
|
||||
// now waiting. After a while, the transition should be done
|
||||
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
|
||||
// the region is still available, the close got rejected at the end
|
||||
Assert.assertTrue("The close should have failed", getRS().getRegion(regionName).isAvailable());
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testOpenCloseByMasterWithZNode() throws Exception {
|
||||
|
||||
ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
|
||||
|
||||
AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
|
||||
regionName, true);
|
||||
AdminProtos.CloseRegionResponse responseClose = getRS().closeRegion(null, crr);
|
||||
Assert.assertTrue(responseClose.getClosed());
|
||||
|
||||
checkRegionIsClosed();
|
||||
|
||||
ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName());
|
||||
|
||||
reopenRegion();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that we can send multiple openRegion to the region server.
|
||||
* This is used when:
|
||||
* - there is a SocketTimeout: in this case, the master does not know if the region server
|
||||
* received the request before the timeout.
|
||||
* - We have a socket error during the operation: same stuff: we don't know
|
||||
* - a master failover: if we find a znode in thz M_ZK_REGION_OFFLINE, we don't know if
|
||||
* the region server has received the query or not. Only solution to be efficient: re-ask
|
||||
* immediately.
|
||||
*/
|
||||
@Test(timeout = 20000)
|
||||
public void testMultipleOpen() throws Exception {
|
||||
|
||||
// We close
|
||||
closeNoZK();
|
||||
checkRegionIsClosed();
|
||||
|
||||
// We reopen. We need a ZK node here, as a open is always triggered by a master.
|
||||
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
|
||||
|
||||
// We're sending multiple requests in a row. The region server must handle this nicely.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(hri, 0);
|
||||
AdminProtos.OpenRegionResponse responseOpen = getRS().openRegion(null, orr);
|
||||
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
|
||||
|
||||
AdminProtos.OpenRegionResponse.RegionOpeningState ors = responseOpen.getOpeningState(0);
|
||||
Assert.assertTrue("request " + i + " failed",
|
||||
ors.equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED) ||
|
||||
ors.equals(AdminProtos.OpenRegionResponse.RegionOpeningState.ALREADY_OPENED)
|
||||
);
|
||||
}
|
||||
|
||||
checkRegionIsOpened();
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testMultipleCloseFromMaster() throws Exception {
|
||||
|
||||
// As opening, we must support multiple requests on the same region
|
||||
ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
AdminProtos.CloseRegionRequest crr =
|
||||
RequestConverter.buildCloseRegionRequest(regionName, 0, null, true);
|
||||
try {
|
||||
AdminProtos.CloseRegionResponse responseClose = getRS().closeRegion(null, crr);
|
||||
Assert.assertEquals("The first request should succeeds", 0, i);
|
||||
Assert.assertTrue("request " + i + " failed",
|
||||
responseClose.getClosed() || responseClose.hasClosed());
|
||||
} catch (ServiceException se) {
|
||||
Assert.assertTrue("The next queries should throw an exception.", i > 0);
|
||||
}
|
||||
}
|
||||
|
||||
checkRegionIsClosed();
|
||||
|
||||
Assert.assertTrue(
|
||||
ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName())
|
||||
);
|
||||
|
||||
reopenRegion();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that if we do a close while opening it stops the opening.
|
||||
*/
|
||||
@Test(timeout = 20000)
|
||||
public void testCancelOpeningWithoutZK() throws Exception {
|
||||
// We close
|
||||
closeNoZK();
|
||||
checkRegionIsClosed();
|
||||
|
||||
// Let do the initial steps, without having a handler
|
||||
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
|
||||
getRS().getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE);
|
||||
|
||||
// That's a close without ZK.
|
||||
AdminProtos.CloseRegionRequest crr =
|
||||
RequestConverter.buildCloseRegionRequest(regionName, false);
|
||||
try {
|
||||
getRS().closeRegion(null, crr);
|
||||
Assert.assertTrue(false);
|
||||
} catch (ServiceException expected) {
|
||||
}
|
||||
|
||||
// The state in RIT should have changed to close
|
||||
Assert.assertEquals(Boolean.FALSE, getRS().getRegionsInTransitionInRS().get(
|
||||
hri.getEncodedNameAsBytes()));
|
||||
|
||||
// Let's start the open handler
|
||||
HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTableName());
|
||||
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0));
|
||||
|
||||
// The open handler should have removed the region from RIT but kept the region closed
|
||||
checkRegionIsClosed();
|
||||
|
||||
// The open handler should have updated the value in ZK.
|
||||
Assert.assertTrue(ZKAssign.deleteNode(
|
||||
getRS().getZooKeeperWatcher(), hri.getEncodedName(),
|
||||
EventHandler.EventType.RS_ZK_REGION_FAILED_OPEN, 1)
|
||||
);
|
||||
|
||||
reopenRegion();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test an open then a close with ZK. This is going to mess-up the ZK states, so
|
||||
* the opening will fail as well because it doesn't find what it expects in ZK.
|
||||
*/
|
||||
@Test(timeout = 20000)
|
||||
public void testCancelOpeningWithZK() throws Exception {
|
||||
// We close
|
||||
closeNoZK();
|
||||
checkRegionIsClosed();
|
||||
|
||||
// Let do the initial steps, without having a handler
|
||||
getRS().getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE);
|
||||
|
||||
// That's a close without ZK.
|
||||
ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
|
||||
AdminProtos.CloseRegionRequest crr =
|
||||
RequestConverter.buildCloseRegionRequest(regionName, false);
|
||||
try {
|
||||
getRS().closeRegion(null, crr);
|
||||
Assert.assertTrue(false);
|
||||
} catch (ServiceException expected) {
|
||||
Assert.assertTrue(expected.getCause() instanceof NotServingRegionException);
|
||||
}
|
||||
|
||||
// The close should have left the ZK state as it is: it's the job the AM to delete it
|
||||
Assert.assertTrue(ZKAssign.deleteNode(
|
||||
getRS().getZooKeeperWatcher(), hri.getEncodedName(),
|
||||
EventHandler.EventType.M_ZK_REGION_CLOSING, 0)
|
||||
);
|
||||
|
||||
// The state in RIT should have changed to close
|
||||
Assert.assertEquals(Boolean.FALSE, getRS().getRegionsInTransitionInRS().get(
|
||||
hri.getEncodedNameAsBytes()));
|
||||
|
||||
// Let's start the open handler
|
||||
// It should not succeed for two reasons:
|
||||
// 1) There is no ZK node
|
||||
// 2) The region in RIT was changed.
|
||||
// The order is more or less implementation dependant.
|
||||
HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTableName());
|
||||
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0));
|
||||
|
||||
// The open handler should have removed the region from RIT but kept the region closed
|
||||
checkRegionIsClosed();
|
||||
|
||||
// We should not find any znode here.
|
||||
Assert.assertEquals(-1, ZKAssign.getVersion(HTU.getZooKeeperWatcher(), hri));
|
||||
|
||||
reopenRegion();
|
||||
}
|
||||
}
|
|
@ -88,7 +88,7 @@ public class TestOpenRegionHandler {
|
|||
@Test public void testYankingRegionFromUnderIt()
|
||||
throws IOException, NodeExistsException, KeeperException {
|
||||
final Server server = new MockServer(HTU);
|
||||
final RegionServerServices rss = new MockRegionServerServices();
|
||||
final RegionServerServices rss = new MockRegionServerServices(HTU.getZooKeeperWatcher());
|
||||
|
||||
HTableDescriptor htd = TEST_HTD;
|
||||
final HRegionInfo hri = TEST_HRI;
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.regionserver.Leases;
|
|||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
|
@ -49,6 +48,15 @@ public class MockRegionServerServices implements RegionServerServices {
|
|||
private final ConcurrentSkipListMap<byte[], Boolean> rit =
|
||||
new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
|
||||
private HFileSystem hfs = null;
|
||||
private ZooKeeperWatcher zkw = null;
|
||||
|
||||
public MockRegionServerServices(ZooKeeperWatcher zkw){
|
||||
this.zkw = zkw;
|
||||
}
|
||||
|
||||
public MockRegionServerServices(){
|
||||
this(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination) {
|
||||
|
@ -112,7 +120,7 @@ public class MockRegionServerServices implements RegionServerServices {
|
|||
|
||||
@Override
|
||||
public ZooKeeperWatcher getZooKeeper() {
|
||||
return null;
|
||||
return zkw;
|
||||
}
|
||||
|
||||
public RegionServerAccounting getRegionServerAccounting() {
|
||||
|
|
Loading…
Reference in New Issue