HBASE-8531 TestZooKeeper fails in trunk/0.95 builds
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1482669 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
47f4477f79
commit
6bf0abbec9
|
@ -193,7 +193,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
done) {
|
||||
close();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Finished with scanning at " + this.currentRegion);
|
||||
LOG.debug("Finished scanning region " + this.currentRegion);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -301,8 +301,9 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
} else {
|
||||
Throwable cause = e.getCause();
|
||||
if ((cause == null || (!(cause instanceof NotServingRegionException)
|
||||
&& !(cause instanceof RegionServerStoppedException)))
|
||||
&& !(e instanceof OutOfOrderScannerNextException)) {
|
||||
&& !(cause instanceof RegionServerStoppedException))) &&
|
||||
!(e instanceof RegionServerStoppedException) &&
|
||||
!(e instanceof OutOfOrderScannerNextException)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -333,12 +333,10 @@ public interface HConnection extends Abortable, Closeable {
|
|||
public boolean getRegionCachePrefetch(final byte[] tableName);
|
||||
|
||||
/**
|
||||
* Scan zookeeper to get the number of region servers
|
||||
* @return the number of region servers that are currently running
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @deprecated This method will be changed from public to package protected.
|
||||
*/
|
||||
@Deprecated
|
||||
public int getCurrentNrHRS() throws IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.SocketException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -144,9 +145,6 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
|
||||
import org.apache.hadoop.hbase.util.Triple;
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -251,17 +249,18 @@ public class HConnectionManager {
|
|||
* @return HConnection object for <code>conf</code>
|
||||
* @throws ZooKeeperConnectionException
|
||||
*/
|
||||
@SuppressWarnings("resource")
|
||||
public static HConnection getConnection(final Configuration conf)
|
||||
throws IOException {
|
||||
HConnectionKey connectionKey = new HConnectionKey(conf);
|
||||
synchronized (CONNECTION_INSTANCES) {
|
||||
HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
|
||||
if (connection == null) {
|
||||
connection = new HConnectionImplementation(conf, true);
|
||||
connection = (HConnectionImplementation)createConnection(conf, true);
|
||||
CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||
} else if (connection.isClosed()) {
|
||||
HConnectionManager.deleteConnection(connectionKey, true);
|
||||
connection = new HConnectionImplementation(conf, true);
|
||||
connection = (HConnectionImplementation)createConnection(conf, true);
|
||||
CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||
}
|
||||
connection.incCount();
|
||||
|
@ -280,7 +279,28 @@ public class HConnectionManager {
|
|||
*/
|
||||
public static HConnection createConnection(Configuration conf)
|
||||
throws IOException {
|
||||
return new HConnectionImplementation(conf, false);
|
||||
return createConnection(conf, false);
|
||||
}
|
||||
|
||||
static HConnection createConnection(final Configuration conf, final boolean managed)
|
||||
throws IOException {
|
||||
String className = conf.get("hbase.client.connection.impl",
|
||||
HConnectionManager.HConnectionImplementation.class.getName());
|
||||
Class<?> clazz = null;
|
||||
try {
|
||||
clazz = Class.forName(className);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
try {
|
||||
// Default HCM#HCI is not accessible; make it so before invoking.
|
||||
Constructor<?> constructor =
|
||||
clazz.getDeclaredConstructor(Configuration.class, boolean.class);
|
||||
constructor.setAccessible(true);
|
||||
return (HConnection) constructor.newInstance(conf, managed);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -418,7 +438,7 @@ public class HConnectionManager {
|
|||
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
|
||||
private final long pause;
|
||||
private final int numTries;
|
||||
private final int rpcTimeout;
|
||||
final int rpcTimeout;
|
||||
private final int prefetchRegionLimit;
|
||||
private final boolean useServerTrackerForRetries;
|
||||
private final long serverTrackerTimeout;
|
||||
|
@ -472,6 +492,11 @@ public class HConnectionManager {
|
|||
// indicates whether this connection's life cycle is managed (by us)
|
||||
private final boolean managed;
|
||||
|
||||
/**
|
||||
* Cluster registry of basic info such as clusterid and meta region location.
|
||||
*/
|
||||
final Registry registry;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
* @param conf Configuration object
|
||||
|
@ -512,6 +537,7 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
this.serverTrackerTimeout = serverTrackerTimeout;
|
||||
this.registry = setupRegistry();
|
||||
retrieveClusterId();
|
||||
|
||||
this.rpcClient = new RpcClient(this.conf, this.clusterId);
|
||||
|
@ -535,6 +561,23 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The cluster registry implementation to use.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Registry setupRegistry() throws IOException {
|
||||
String registryClass = this.conf.get("hbase.client.registry.impl",
|
||||
ZooKeeperRegistry.class.getName());
|
||||
Registry registry = null;
|
||||
try {
|
||||
registry = (Registry)Class.forName(registryClass).newInstance();
|
||||
} catch (Throwable t) {
|
||||
throw new IOException(t);
|
||||
}
|
||||
registry.init(this);
|
||||
return registry;
|
||||
}
|
||||
|
||||
/**
|
||||
* For tests only.
|
||||
* @param rpcClient Client we should use instead.
|
||||
|
@ -554,31 +597,11 @@ public class HConnectionManager {
|
|||
return "hconnection-0x" + Integer.toHexString(hashCode());
|
||||
}
|
||||
|
||||
private String clusterId = null;
|
||||
protected String clusterId = null;
|
||||
|
||||
public final void retrieveClusterId(){
|
||||
if (clusterId != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// No synchronized here, worse case we will retrieve it twice, that's
|
||||
// not an issue.
|
||||
ZooKeeperKeepAliveConnection zkw = null;
|
||||
try {
|
||||
zkw = getKeepAliveZooKeeperWatcher();
|
||||
clusterId = ZKClusterId.readClusterIdZNode(zkw);
|
||||
if (clusterId == null) {
|
||||
LOG.info("ClusterId read in ZooKeeper is null");
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Can't retrieve clusterId from Zookeeper", e);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Can't retrieve clusterId from Zookeeper", e);
|
||||
} finally {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
void retrieveClusterId() {
|
||||
if (clusterId != null) return;
|
||||
this.clusterId = this.registry.getClusterId();
|
||||
if (clusterId == null) {
|
||||
clusterId = HConstants.CLUSTER_ID_DEFAULT;
|
||||
}
|
||||
|
@ -639,12 +662,12 @@ public class HConnectionManager {
|
|||
|
||||
@Override
|
||||
public boolean isTableEnabled(byte[] tableName) throws IOException {
|
||||
return testTableOnlineState(tableName, true);
|
||||
return this.registry.isTableOnlineState(tableName, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableDisabled(byte[] tableName) throws IOException {
|
||||
return testTableOnlineState(tableName, false);
|
||||
return this.registry.isTableOnlineState(tableName, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -716,26 +739,6 @@ public class HConnectionManager {
|
|||
return available.get() && (regionCount.get() == splitKeys.length + 1);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param enabled True if table is enabled
|
||||
*/
|
||||
private boolean testTableOnlineState(byte [] tableName, boolean enabled)
|
||||
throws IOException {
|
||||
String tableNameStr = Bytes.toString(tableName);
|
||||
ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher();
|
||||
try {
|
||||
if (enabled) {
|
||||
return ZKTableReadOnly.isEnabledTable(zkw, tableNameStr);
|
||||
}
|
||||
return ZKTableReadOnly.isDisabledTable(zkw, tableNameStr);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Enable/Disable failed", e);
|
||||
}finally {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
|
||||
return locateRegion(HRegionInfo.getTableName(regionName),
|
||||
|
@ -801,26 +804,7 @@ public class HConnectionManager {
|
|||
}
|
||||
|
||||
if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
|
||||
ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher();
|
||||
try {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
|
||||
}
|
||||
ServerName servername =
|
||||
MetaRegionTracker.blockUntilAvailable(zkw, this.rpcTimeout);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.debug("Looked up meta region location, connection=" + this +
|
||||
"; serverName=" + ((servername == null) ? "null" : servername));
|
||||
}
|
||||
if (servername == null) return null;
|
||||
return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
} finally {
|
||||
zkw.close();
|
||||
}
|
||||
return this.registry.getMetaRegionLocation();
|
||||
} else {
|
||||
// Region not in the cache - have to go to the meta RS
|
||||
return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
|
||||
|
@ -1532,7 +1516,7 @@ public class HConnectionManager {
|
|||
stub = (ClientService.BlockingInterface)this.stubs.get(key);
|
||||
if (stub == null) {
|
||||
BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
|
||||
User.getCurrent(), this.rpcTimeout);
|
||||
User.getCurrent(), this.rpcTimeout);
|
||||
stub = ClientService.newBlockingStub(channel);
|
||||
// In old days, after getting stub/proxy, we'd make a call. We are not doing that here.
|
||||
// Just fail on first actual call rather than in here on setup.
|
||||
|
@ -1557,7 +1541,7 @@ public class HConnectionManager {
|
|||
* Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
|
||||
* @return The shared instance. Never returns null.
|
||||
*/
|
||||
public ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
|
||||
ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
|
||||
throws IOException {
|
||||
synchronized (masterAndZKLock) {
|
||||
if (keepAliveZookeeper == null) {
|
||||
|
@ -1566,8 +1550,7 @@ public class HConnectionManager {
|
|||
}
|
||||
// We don't check that our link to ZooKeeper is still valid
|
||||
// But there is a retry mechanism in the ZooKeeperWatcher itself
|
||||
keepAliveZookeeper = new ZooKeeperKeepAliveConnection(
|
||||
conf, this.toString(), this);
|
||||
keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
|
||||
}
|
||||
keepAliveZookeeperUserCount++;
|
||||
keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
|
||||
|
@ -1575,20 +1558,18 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
void releaseZooKeeperWatcher(ZooKeeperWatcher zkw) {
|
||||
void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
|
||||
if (zkw == null){
|
||||
return;
|
||||
}
|
||||
synchronized (masterAndZKLock) {
|
||||
--keepAliveZookeeperUserCount;
|
||||
if (keepAliveZookeeperUserCount <=0 ){
|
||||
keepZooKeeperWatcherAliveUntil =
|
||||
System.currentTimeMillis() + keepAlive;
|
||||
if (keepAliveZookeeperUserCount <= 0 ){
|
||||
keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a Chore thread to check the connections to master & zookeeper
|
||||
* and close them when they reach their closing time (
|
||||
|
@ -2581,17 +2562,7 @@ public class HConnectionManager {
|
|||
|
||||
@Override
|
||||
public int getCurrentNrHRS() throws IOException {
|
||||
ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher();
|
||||
|
||||
try {
|
||||
// We go to zk rather than to master to get count of regions to avoid
|
||||
// HTable having a Master dependency. See HBase-2828
|
||||
return ZKUtil.getNumberOfChildren(zkw, zkw.rsZNode);
|
||||
} catch (KeeperException ke) {
|
||||
throw new IOException("Unexpected ZooKeeper exception", ke);
|
||||
} finally {
|
||||
zkw.close();
|
||||
}
|
||||
return this.registry.getCurrentNrHRS();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -37,9 +37,8 @@ import java.util.NavigableMap;
|
|||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
* Scanner class that contains the <code>.META.</code> table scanning logic
|
||||
* and uses a Retryable scanner. Provided visitors will be called
|
||||
* for each row.
|
||||
* Scanner class that contains the <code>.META.</code> table scanning logic.
|
||||
* Provided visitors will be called for each row.
|
||||
*
|
||||
* Although public visibility, this is not a public-facing API and may evolve in
|
||||
* minor releases.
|
||||
|
@ -123,114 +122,58 @@ public class MetaScanner {
|
|||
public static void metaScan(Configuration configuration,
|
||||
final MetaScannerVisitor visitor, final byte[] tableName,
|
||||
final byte[] row, final int rowLimit, final byte[] metaTableName)
|
||||
throws IOException {
|
||||
try {
|
||||
HConnectionManager.execute(new HConnectable<Void>(configuration) {
|
||||
@Override
|
||||
public Void connect(HConnection connection) throws IOException {
|
||||
metaScan(conf, connection, visitor, tableName, row, rowLimit,
|
||||
metaTableName);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
visitor.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static void metaScan(Configuration configuration, HConnection connection,
|
||||
MetaScannerVisitor visitor, byte [] tableName, byte[] row,
|
||||
int rowLimit, final byte [] metaTableName)
|
||||
throws IOException {
|
||||
int rowUpperLimit = rowLimit > 0 ? rowLimit: Integer.MAX_VALUE;
|
||||
|
||||
// if row is not null, we want to use the startKey of the row's region as
|
||||
// the startRow for the meta scan.
|
||||
HTable metaTable = new HTable(configuration, HConstants.META_TABLE_NAME);
|
||||
// Calculate startrow for scan.
|
||||
byte[] startRow;
|
||||
if (row != null) {
|
||||
// Scan starting at a particular row in a particular table
|
||||
assert tableName != null;
|
||||
byte[] searchRow =
|
||||
HRegionInfo.createRegionName(tableName, row, HConstants.NINES,
|
||||
false);
|
||||
HTable metaTable = null;
|
||||
try {
|
||||
metaTable = new HTable(configuration, HConstants.META_TABLE_NAME);
|
||||
Result startRowResult = metaTable.getRowOrBefore(searchRow,
|
||||
HConstants.CATALOG_FAMILY);
|
||||
try {
|
||||
if (row != null) {
|
||||
// Scan starting at a particular row in a particular table
|
||||
byte[] searchRow = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
|
||||
Result startRowResult = metaTable.getRowOrBefore(searchRow, HConstants.CATALOG_FAMILY);
|
||||
if (startRowResult == null) {
|
||||
throw new TableNotFoundException("Cannot find row in .META. for table: "
|
||||
+ Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));
|
||||
throw new TableNotFoundException("Cannot find row in .META. for table: " +
|
||||
Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));
|
||||
}
|
||||
HRegionInfo regionInfo = getHRegionInfo(startRowResult);
|
||||
if (regionInfo == null) {
|
||||
throw new IOException("HRegionInfo was null or empty in Meta for " +
|
||||
Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));
|
||||
}
|
||||
|
||||
byte[] rowBefore = regionInfo.getStartKey();
|
||||
startRow = HRegionInfo.createRegionName(tableName, rowBefore,
|
||||
HConstants.ZEROES, false);
|
||||
} finally {
|
||||
if (metaTable != null) {
|
||||
metaTable.close();
|
||||
}
|
||||
startRow = HRegionInfo.createRegionName(tableName, rowBefore, HConstants.ZEROES, false);
|
||||
} else if (tableName == null || tableName.length == 0) {
|
||||
// Full META scan
|
||||
startRow = HConstants.EMPTY_START_ROW;
|
||||
} else {
|
||||
// Scan META for an entire table
|
||||
startRow = HRegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW,
|
||||
HConstants.ZEROES, false);
|
||||
}
|
||||
} else if (tableName == null || tableName.length == 0) {
|
||||
// Full META scan
|
||||
startRow = HConstants.EMPTY_START_ROW;
|
||||
} else {
|
||||
// Scan META for an entire table
|
||||
startRow = HRegionInfo.createRegionName(
|
||||
tableName, HConstants.EMPTY_START_ROW, HConstants.ZEROES, false);
|
||||
}
|
||||
|
||||
// Scan over each meta region
|
||||
ScannerCallable callable;
|
||||
int rows = Math.min(rowLimit, configuration.getInt(
|
||||
HConstants.HBASE_META_SCANNER_CACHING,
|
||||
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING));
|
||||
do {
|
||||
final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY);
|
||||
int rows = Math.min(rowLimit, configuration.getInt(HConstants.HBASE_META_SCANNER_CACHING,
|
||||
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING));
|
||||
scan.setCaching(rows);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Scanning " + Bytes.toString(metaTableName) +
|
||||
" starting at row=" + Bytes.toStringBinary(startRow) + " for max=" +
|
||||
rowUpperLimit + " rows using " + connection.toString());
|
||||
LOG.debug("Scanning " + Bytes.toString(metaTableName) + " starting at row=" +
|
||||
Bytes.toStringBinary(startRow) + " for max=" + rowUpperLimit + " with caching=" + rows);
|
||||
}
|
||||
callable = new ScannerCallable(connection, metaTableName, scan, null);
|
||||
// Open scanner
|
||||
callable.withRetries();
|
||||
|
||||
// Run the scan
|
||||
ResultScanner scanner = metaTable.getScanner(scan);
|
||||
Result result = null;
|
||||
int processedRows = 0;
|
||||
try {
|
||||
callable.setCaching(rows);
|
||||
done: do {
|
||||
if (processedRows >= rowUpperLimit) {
|
||||
break;
|
||||
}
|
||||
//we have all the rows here
|
||||
Result [] rrs = callable.withRetries();
|
||||
if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {
|
||||
break; //exit completely
|
||||
}
|
||||
for (Result rr : rrs) {
|
||||
if (processedRows >= rowUpperLimit) {
|
||||
break done;
|
||||
}
|
||||
if (!visitor.processRow(rr))
|
||||
break done; //exit completely
|
||||
processedRows++;
|
||||
}
|
||||
//here, we didn't break anywhere. Check if we have more rows
|
||||
} while(true);
|
||||
// Advance the startRow to the end key of the current region
|
||||
startRow = callable.getHRegionInfo().getEndKey();
|
||||
} finally {
|
||||
// Close scanner
|
||||
callable.setClose();
|
||||
callable.withRetries();
|
||||
while ((result = scanner.next()) != null) {
|
||||
if (visitor != null) {
|
||||
if (!visitor.processRow(result)) break;
|
||||
}
|
||||
processedRows++;
|
||||
if (processedRows >= rowUpperLimit) break;
|
||||
}
|
||||
} while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0);
|
||||
} finally {
|
||||
if (visitor != null) visitor.close();
|
||||
if (metaTable != null) metaTable.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
||||
/**
|
||||
* Cluster registry.
|
||||
* Implemenations hold cluster information such as this cluster's id, location of .META., etc.
|
||||
*/
|
||||
interface Registry {
|
||||
/**
|
||||
* @param connection
|
||||
*/
|
||||
void init(HConnection connection);
|
||||
|
||||
/**
|
||||
* @return Meta region location
|
||||
* @throws IOException
|
||||
*/
|
||||
HRegionLocation getMetaRegionLocation() throws IOException;
|
||||
|
||||
/**
|
||||
* @return Cluster id.
|
||||
*/
|
||||
String getClusterId();
|
||||
|
||||
/**
|
||||
* @param enabled Return true if table is enabled
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean isTableOnlineState(byte [] tableName, boolean enabled) throws IOException;
|
||||
|
||||
/**
|
||||
* @return Count of 'running' regionservers
|
||||
* @throws IOException
|
||||
*/
|
||||
int getCurrentNrHRS() throws IOException;
|
||||
}
|
|
@ -201,12 +201,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
|||
if (this.scanMetrics != null) {
|
||||
this.scanMetrics.countOfNSRE.incrementAndGet();
|
||||
}
|
||||
throw new DoNotRetryIOException("Reset scanner", ioe);
|
||||
} else if (ioe instanceof RegionServerStoppedException) {
|
||||
// Throw a DNRE so that we break out of cycle of calling RSSE
|
||||
// when what we need is to open scanner against new location.
|
||||
// Attach RSSE to signal client that it needs to resetup scanner.
|
||||
throw new DoNotRetryIOException("Reset scanner", ioe);
|
||||
throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
|
||||
} else {
|
||||
// The outer layers will retry
|
||||
throw ioe;
|
||||
|
|
|
@ -169,8 +169,7 @@ public abstract class ServerCallable<T> implements Callable<T> {
|
|||
prepare(tries != 0); // if called with false, check table status on ZK
|
||||
return call();
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Received exception, tries=" + tries + ", numRetries=" + numRetries + ":" +
|
||||
t.getMessage());
|
||||
LOG.warn("Call exception, tries=" + tries + ", numRetries=" + numRetries + ": " + t);
|
||||
|
||||
t = translateException(t);
|
||||
// translateException throws an exception when we should not retry, i.e. when it's the
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* A cluster registry that stores to zookeeper.
|
||||
*/
|
||||
class ZooKeeperRegistry implements Registry {
|
||||
static final Log LOG = LogFactory.getLog(ZooKeeperRegistry.class);
|
||||
// Needs an instance of hci to function. Set after construct this instance.
|
||||
HConnectionManager.HConnectionImplementation hci;
|
||||
|
||||
@Override
|
||||
public void init(HConnection connection) {
|
||||
if (!(connection instanceof HConnectionManager.HConnectionImplementation)) {
|
||||
throw new RuntimeException("This registry depends on HConnectionImplementation");
|
||||
}
|
||||
this.hci = (HConnectionManager.HConnectionImplementation)connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionLocation getMetaRegionLocation() throws IOException {
|
||||
ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();
|
||||
|
||||
try {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
|
||||
}
|
||||
ServerName servername = MetaRegionTracker.blockUntilAvailable(zkw, hci.rpcTimeout);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Looked up meta region location, connection=" + this +
|
||||
"; serverName=" + ((servername == null) ? "null" : servername));
|
||||
}
|
||||
if (servername == null) return null;
|
||||
return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
} finally {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterId() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableOnlineState(byte [] tableName, boolean enabled)
|
||||
throws IOException {
|
||||
String tableNameStr = Bytes.toString(tableName);
|
||||
ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();
|
||||
try {
|
||||
if (enabled) {
|
||||
return ZKTableReadOnly.isEnabledTable(zkw, tableNameStr);
|
||||
}
|
||||
return ZKTableReadOnly.isDisabledTable(zkw, tableNameStr);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Enable/Disable failed", e);
|
||||
} finally {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCurrentNrHRS() throws IOException {
|
||||
ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();
|
||||
try {
|
||||
// We go to zk rather than to master to get count of regions to avoid
|
||||
// HTable having a Master dependency. See HBase-2828
|
||||
return ZKUtil.getNumberOfChildren(zkw, zkw.rsZNode);
|
||||
} catch (KeeperException ke) {
|
||||
throw new IOException("Unexpected ZooKeeper exception", ke);
|
||||
} finally {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,14 +20,12 @@ package org.apache.hadoop.hbase.exceptions;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Thrown by the region server when it is in shutting down state.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
@InterfaceAudience.Private
|
||||
public class RegionServerStoppedException extends IOException {
|
||||
public class RegionServerStoppedException extends DoNotRetryIOException {
|
||||
|
||||
public RegionServerStoppedException(String s) {
|
||||
super(s);
|
||||
|
|
|
@ -0,0 +1,143 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Test client behavior w/o setting up a cluster.
|
||||
* Mock up cluster emissions.
|
||||
*/
|
||||
public class TestClientNoCluster {
|
||||
private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class);
|
||||
private Configuration conf;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
this.conf = HBaseConfiguration.create();
|
||||
// Run my HConnection overrides. Use my little HConnectionImplementation below which
|
||||
// allows me insert mocks and also use my Registry below rather than the default zk based
|
||||
// one so tests run faster and don't have zk dependency.
|
||||
this.conf.set("hbase.client.connection.impl", NoClusterConnection.class.getName());
|
||||
this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple cluster registry inserted in place of our usual zookeeper based one.
|
||||
*/
|
||||
static class SimpleRegistry implements Registry {
|
||||
final ServerName META_HOST = new ServerName("10.10.10.10", 60010, 12345);
|
||||
|
||||
@Override
|
||||
public void init(HConnection connection) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionLocation getMetaRegionLocation() throws IOException {
|
||||
return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, META_HOST);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterId() {
|
||||
return HConstants.CLUSTER_ID_DEFAULT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableOnlineState(byte[] tableName, boolean enabled)
|
||||
throws IOException {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCurrentNrHRS() throws IOException {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoNotRetryMetaScanner() throws IOException {
|
||||
MetaScanner.metaScan(this.conf, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoNotRetryOnScan() throws IOException {
|
||||
// Go against meta else we will try to find first region for the table on construction which
|
||||
// means we'll have to do a bunch more mocking. Tests that go against meta only should be
|
||||
// good for a bit of testing.
|
||||
HTable table = new HTable(this.conf, HConstants.META_TABLE_NAME);
|
||||
ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
|
||||
try {
|
||||
Result result = null;
|
||||
while ((result = scanner.next()) != null) {
|
||||
LOG.info(result);
|
||||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override to shutdown going to zookeeper for cluster id and meta location.
|
||||
*/
|
||||
static class NoClusterConnection extends HConnectionManager.HConnectionImplementation {
|
||||
final ClientService.BlockingInterface stub;
|
||||
|
||||
NoClusterConnection(Configuration conf, boolean managed) throws IOException {
|
||||
super(conf, managed);
|
||||
// Mock up my stub so open scanner returns a scanner id and then on next, we throw
|
||||
// exceptions for three times and then after that, we return no more to scan.
|
||||
this.stub = Mockito.mock(ClientService.BlockingInterface.class);
|
||||
long sid = 12345L;
|
||||
try {
|
||||
Mockito.when(stub.scan((RpcController)Mockito.any(),
|
||||
(ClientProtos.ScanRequest)Mockito.any())).
|
||||
thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
|
||||
thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
|
||||
thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
|
||||
setMoreResults(false).build());
|
||||
} catch (ServiceException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockingInterface getClient(ServerName sn) throws IOException {
|
||||
return this.stub;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
<?xml version="1.0"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
-->
|
||||
<configuration>
|
||||
<property>
|
||||
<name>hbase.defaults.for.version.skip</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.client.retries.number</name>
|
||||
<value>5</value>
|
||||
<description>Maximum retries. Used as maximum for all retryable
|
||||
operations such as fetching of the root region from root region
|
||||
server, getting a cell's value, starting a row update, etc.
|
||||
Default: 10.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
|
@ -798,7 +798,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
} // for
|
||||
} catch (Throwable t) {
|
||||
if (!checkOOME(t)) {
|
||||
abort("Unhandled exception: " + t.getMessage(), t);
|
||||
String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
|
||||
abort(prefix + t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
// Run shutdown.
|
||||
|
|
|
@ -108,12 +108,10 @@ public class TestZooKeeper {
|
|||
}
|
||||
|
||||
|
||||
private ZooKeeperWatcher getZooKeeperWatcher(HConnection c) throws
|
||||
NoSuchMethodException, InvocationTargetException, IllegalAccessException {
|
||||
|
||||
Method getterZK = c.getClass().getMethod("getKeepAliveZooKeeperWatcher");
|
||||
private ZooKeeperWatcher getZooKeeperWatcher(HConnection c)
|
||||
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
|
||||
Method getterZK = c.getClass().getDeclaredMethod("getKeepAliveZooKeeperWatcher");
|
||||
getterZK.setAccessible(true);
|
||||
|
||||
return (ZooKeeperWatcher) getterZK.invoke(c);
|
||||
}
|
||||
|
||||
|
@ -196,12 +194,12 @@ public class TestZooKeeper {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout = 60000)
|
||||
public void testRegionServerSessionExpired() throws Exception {
|
||||
LOG.info("Starting testRegionServerSessionExpired");
|
||||
int metaIndex = TEST_UTIL.getMiniHBaseCluster().getServerWithMeta();
|
||||
TEST_UTIL.expireRegionServerSession(metaIndex);
|
||||
testSanity();
|
||||
testSanity("testRegionServerSessionExpired");
|
||||
}
|
||||
|
||||
// @Test Disabled because seems to make no sense expiring master session
|
||||
|
@ -210,7 +208,7 @@ public class TestZooKeeper {
|
|||
public void testMasterSessionExpired() throws Exception {
|
||||
LOG.info("Starting testMasterSessionExpired");
|
||||
TEST_UTIL.expireMasterSession();
|
||||
testSanity();
|
||||
testSanity("testMasterSessionExpired");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -220,27 +218,31 @@ public class TestZooKeeper {
|
|||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testMasterZKSessionRecoveryFailure() throws Exception {
|
||||
LOG.info("Starting testMasterZKSessionRecoveryFailure");
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
HMaster m = cluster.getMaster();
|
||||
m.abort("Test recovery from zk session expired",
|
||||
new KeeperException.SessionExpiredException());
|
||||
assertFalse(m.isStopped());
|
||||
testSanity();
|
||||
testSanity("testMasterZKSessionRecoveryFailure");
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure we can use the cluster
|
||||
* @throws Exception
|
||||
*/
|
||||
private void testSanity() throws Exception{
|
||||
HBaseAdmin admin =
|
||||
new HBaseAdmin(TEST_UTIL.getConfiguration());
|
||||
String tableName = "test"+System.currentTimeMillis();
|
||||
private void testSanity(final String testName) throws Exception{
|
||||
String tableName = testName + "." + System.currentTimeMillis();
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor family = new HColumnDescriptor("fam");
|
||||
desc.addFamily(family);
|
||||
LOG.info("Creating table " + tableName);
|
||||
admin.createTable(desc);
|
||||
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
|
||||
try {
|
||||
admin.createTable(desc);
|
||||
} finally {
|
||||
admin.close();
|
||||
}
|
||||
|
||||
HTable table =
|
||||
new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
|
||||
|
@ -253,33 +255,29 @@ public class TestZooKeeper {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleZK() {
|
||||
try {
|
||||
HTable localMeta =
|
||||
new HTable(new Configuration(TEST_UTIL.getConfiguration()), HConstants.META_TABLE_NAME);
|
||||
Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
|
||||
HTable ipMeta = new HTable(otherConf, HConstants.META_TABLE_NAME);
|
||||
public void testMultipleZK()
|
||||
throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
|
||||
HTable localMeta =
|
||||
new HTable(new Configuration(TEST_UTIL.getConfiguration()), HConstants.META_TABLE_NAME);
|
||||
Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
|
||||
HTable ipMeta = new HTable(otherConf, HConstants.META_TABLE_NAME);
|
||||
|
||||
// dummy, just to open the connection
|
||||
final byte [] row = new byte [] {'r'};
|
||||
localMeta.exists(new Get(row));
|
||||
ipMeta.exists(new Get(row));
|
||||
// dummy, just to open the connection
|
||||
final byte [] row = new byte [] {'r'};
|
||||
localMeta.exists(new Get(row));
|
||||
ipMeta.exists(new Get(row));
|
||||
|
||||
// make sure they aren't the same
|
||||
ZooKeeperWatcher z1 =
|
||||
getZooKeeperWatcher(HConnectionManager.getConnection(localMeta.getConfiguration()));
|
||||
ZooKeeperWatcher z2 =
|
||||
getZooKeeperWatcher(HConnectionManager.getConnection(otherConf));
|
||||
assertFalse(z1 == z2);
|
||||
assertFalse(z1.getQuorum().equals(z2.getQuorum()));
|
||||
// make sure they aren't the same
|
||||
ZooKeeperWatcher z1 =
|
||||
getZooKeeperWatcher(HConnectionManager.getConnection(localMeta.getConfiguration()));
|
||||
ZooKeeperWatcher z2 =
|
||||
getZooKeeperWatcher(HConnectionManager.getConnection(otherConf));
|
||||
assertFalse(z1 == z2);
|
||||
assertFalse(z1.getQuorum().equals(z2.getQuorum()));
|
||||
|
||||
localMeta.close();
|
||||
ipMeta.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
fail();
|
||||
}
|
||||
localMeta.close();
|
||||
ipMeta.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -446,8 +444,11 @@ public class TestZooKeeper {
|
|||
// Restore the ACL
|
||||
ZooKeeper zk3 = new ZooKeeper(quorumServers, sessionTimeout, EmptyWatcher.instance);
|
||||
zk3.addAuthInfo("digest", "hbase:rox".getBytes());
|
||||
zk3.setACL("/", oldACL, -1);
|
||||
zk3.close();
|
||||
try {
|
||||
zk3.setACL("/", oldACL, -1);
|
||||
} finally {
|
||||
zk3.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -477,23 +478,25 @@ public class TestZooKeeper {
|
|||
int expectedNumOfListeners = zkw.getNumberOfListeners();
|
||||
// now the cluster is up. So assign some regions.
|
||||
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
|
||||
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"),
|
||||
try {
|
||||
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"),
|
||||
Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
|
||||
Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j") };
|
||||
|
||||
String tableName = "testRegionAssignmentAfterMasterRecoveryDueToZKExpiry";
|
||||
admin.createTable(new HTableDescriptor(tableName), SPLIT_KEYS);
|
||||
ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL);
|
||||
ZKAssign.blockUntilNoRIT(zooKeeperWatcher);
|
||||
m.getZooKeeperWatcher().close();
|
||||
MockLoadBalancer.retainAssignCalled = false;
|
||||
m.abort("Test recovery from zk session expired", new KeeperException.SessionExpiredException());
|
||||
assertFalse(m.isStopped());
|
||||
// The recovered master should not call retainAssignment, as it is not a
|
||||
// clean startup.
|
||||
assertFalse("Retain assignment should not be called", MockLoadBalancer.retainAssignCalled);
|
||||
// number of listeners should be same as the value before master aborted
|
||||
assertEquals(expectedNumOfListeners, zkw.getNumberOfListeners());
|
||||
String tableName = "testRegionAssignmentAfterMasterRecoveryDueToZKExpiry";
|
||||
admin.createTable(new HTableDescriptor(tableName), SPLIT_KEYS);
|
||||
ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL);
|
||||
ZKAssign.blockUntilNoRIT(zooKeeperWatcher);
|
||||
m.getZooKeeperWatcher().close();
|
||||
MockLoadBalancer.retainAssignCalled = false;
|
||||
m.abort("Test recovery from zk session expired",
|
||||
new KeeperException.SessionExpiredException());
|
||||
assertFalse(m.isStopped());
|
||||
// The recovered master should not call retainAssignment, as it is not a
|
||||
// clean startup.
|
||||
assertFalse("Retain assignment should not be called", MockLoadBalancer.retainAssignCalled);
|
||||
} finally {
|
||||
admin.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -508,40 +511,47 @@ public class TestZooKeeper {
|
|||
HMaster m = cluster.getMaster();
|
||||
// now the cluster is up. So assign some regions.
|
||||
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
|
||||
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("1"), Bytes.toBytes("2"),
|
||||
HTable table = null;
|
||||
try {
|
||||
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("1"), Bytes.toBytes("2"),
|
||||
Bytes.toBytes("3"), Bytes.toBytes("4"), Bytes.toBytes("5") };
|
||||
|
||||
String tableName = "testLogSplittingAfterMasterRecoveryDueToZKExpiry";
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor("col");
|
||||
htd.addFamily(hcd);
|
||||
admin.createTable(htd, SPLIT_KEYS);
|
||||
ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL);
|
||||
ZKAssign.blockUntilNoRIT(zooKeeperWatcher);
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
|
||||
Put p;
|
||||
int numberOfPuts;
|
||||
for (numberOfPuts = 0; numberOfPuts < 6; numberOfPuts++) {
|
||||
p = new Put(Bytes.toBytes(numberOfPuts));
|
||||
p.add(Bytes.toBytes("col"), Bytes.toBytes("ql"), Bytes.toBytes("value" + numberOfPuts));
|
||||
table.put(p);
|
||||
String tableName = "testLogSplittingAfterMasterRecoveryDueToZKExpiry";
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor("col");
|
||||
htd.addFamily(hcd);
|
||||
admin.createTable(htd, SPLIT_KEYS);
|
||||
ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL);
|
||||
ZKAssign.blockUntilNoRIT(zooKeeperWatcher);
|
||||
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
Put p;
|
||||
int numberOfPuts;
|
||||
for (numberOfPuts = 0; numberOfPuts < 6; numberOfPuts++) {
|
||||
p = new Put(Bytes.toBytes(numberOfPuts));
|
||||
p.add(Bytes.toBytes("col"), Bytes.toBytes("ql"), Bytes.toBytes("value" + numberOfPuts));
|
||||
table.put(p);
|
||||
}
|
||||
m.getZooKeeperWatcher().close();
|
||||
m.abort("Test recovery from zk session expired",
|
||||
new KeeperException.SessionExpiredException());
|
||||
assertFalse(m.isStopped());
|
||||
cluster.getRegionServer(0).abort("Aborting");
|
||||
// Without patch for HBASE-6046 this test case will always timeout
|
||||
// with patch the test case should pass.
|
||||
Scan scan = new Scan();
|
||||
int numberOfRows = 0;
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
Result[] result = scanner.next(1);
|
||||
while (result != null && result.length > 0) {
|
||||
numberOfRows++;
|
||||
result = scanner.next(1);
|
||||
}
|
||||
assertEquals("Number of rows should be equal to number of puts.", numberOfPuts,
|
||||
numberOfRows);
|
||||
} finally {
|
||||
if (table != null) table.close();
|
||||
admin.close();
|
||||
}
|
||||
m.getZooKeeperWatcher().close();
|
||||
m.abort("Test recovery from zk session expired", new KeeperException.SessionExpiredException());
|
||||
assertFalse(m.isStopped());
|
||||
cluster.getRegionServer(0).abort("Aborting");
|
||||
// Without patch for HBASE-6046 this test case will always timeout
|
||||
// with patch the test case should pass.
|
||||
Scan scan = new Scan();
|
||||
int numberOfRows = 0;
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
Result[] result = scanner.next(1);
|
||||
while (result != null && result.length > 0) {
|
||||
numberOfRows++;
|
||||
result = scanner.next(1);
|
||||
}
|
||||
assertEquals("Number of rows should be equal to number of puts.", numberOfPuts, numberOfRows);
|
||||
}
|
||||
|
||||
static class MockLoadBalancer extends DefaultLoadBalancer {
|
||||
|
|
Loading…
Reference in New Issue