HBASE-10357 Failover RPC's for scans (Devaraj Das)

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1595388 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2014-05-16 23:37:38 +00:00
parent 25baace0de
commit 5a8f3f7cef
24 changed files with 1076 additions and 191 deletions

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -55,24 +58,33 @@ public class ClientScanner extends AbstractClientScanner {
// Current region scanner is against. Gets cleared if current region goes
// wonky: e.g. if it splits on us.
protected HRegionInfo currentRegion = null;
protected ScannerCallable callable = null;
protected ScannerCallableWithReplicas callable = null;
protected final LinkedList<Result> cache = new LinkedList<Result>();
protected final int caching;
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
protected Result lastResult = null;
protected final long maxScannerResultSize;
private final HConnection connection;
private final ClusterConnection connection;
private final TableName tableName;
protected final int scannerTimeout;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result []> caller;
protected RpcControllerFactory rpcControllerFactory;
protected Configuration conf;
//The timeout on the primary. Applicable if there are multiple replicas for a region
//In that case, we will only wait for this much timeout on the primary before going
//to the replicas and trying the same scan. Note that the retries will still happen
//on each replica and the first successful results will be taken. A timeout of 0 is
//disallowed.
protected final int primaryOperationTimeout;
private int retries;
protected final ExecutorService pool;
/**
* Create a new ClientScanner for the specified table. An HConnection will be
* Create a new ClientScanner for the specified table. A ClusterConnection will be
* retrieved using the passed Configuration.
* Note that the passed {@link Scan}'s start row maybe changed changed.
* Note that the passed {@link Scan}'s start row maybe changed.
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
@ -81,7 +93,7 @@ public class ClientScanner extends AbstractClientScanner {
*/
public ClientScanner(final Configuration conf, final Scan scan,
final TableName tableName) throws IOException {
this(conf, scan, tableName, HConnectionManager.getConnection(conf));
this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf));
}
/**
@ -100,7 +112,7 @@ public class ClientScanner extends AbstractClientScanner {
*/
@Deprecated
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
HConnection connection) throws IOException {
ClusterConnection connection) throws IOException {
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf),
RpcControllerFactory.instantiate(conf));
}
@ -111,20 +123,26 @@ public class ClientScanner extends AbstractClientScanner {
*/
@Deprecated
public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
HConnection connection) throws IOException {
this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
RpcControllerFactory.instantiate(conf));
ClusterConnection connection) throws IOException {
this(conf, scan, TableName.valueOf(tableName), connection);
}
/**
* @deprecated Use
* {@link #ClientScanner(Configuration, Scan, TableName, HConnection,
* RpcRetryingCallerFactory, RpcControllerFactory)} instead.
* Create a new ClientScanner for the specified table.
* Note that the passed {@link Scan}'s start row maybe changed.
*
* @param conf
* @param scan
* @param tableName
* @param connection
* @param rpcFactory
* @throws IOException
*/
@Deprecated
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
public ClientScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory)
throws IOException {
this(conf, scan, tableName, connection, rpcFactory, controllerFactory, null, 0);
}
/**
@ -137,8 +155,8 @@ public class ClientScanner extends AbstractClientScanner {
* @throws IOException
*/
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
HConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory) throws IOException {
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Scan table=" + tableName
+ ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@ -147,6 +165,10 @@ public class ClientScanner extends AbstractClientScanner {
this.tableName = tableName;
this.lastNext = System.currentTimeMillis();
this.connection = connection;
this.pool = pool;
this.primaryOperationTimeout = primaryOperationTimeout;
this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
if (scan.getMaxResultSize() > 0) {
this.maxScannerResultSize = scan.getMaxResultSize();
} else {
@ -174,6 +196,7 @@ public class ClientScanner extends AbstractClientScanner {
this.caller = rpcFactory.<Result[]> newCaller();
this.rpcControllerFactory = controllerFactory;
this.conf = conf;
initializeScannerInConstruction();
}
@ -182,7 +205,7 @@ public class ClientScanner extends AbstractClientScanner {
nextScanner(this.caching, false);
}
protected HConnection getConnection() {
protected ClusterConnection getConnection() {
return this.connection;
}
@ -199,10 +222,34 @@ public class ClientScanner extends AbstractClientScanner {
return this.tableName;
}
protected int getRetries() {
return this.retries;
}
protected int getScannerTimeout() {
return this.scannerTimeout;
}
protected Configuration getConf() {
return this.conf;
}
protected Scan getScan() {
return scan;
}
protected ExecutorService getPool() {
return pool;
}
protected int getPrimaryOperationTimeout() {
return primaryOperationTimeout;
}
protected int getCaching() {
return caching;
}
protected long getTimestamp() {
return lastNext;
}
@ -223,6 +270,15 @@ public class ClientScanner extends AbstractClientScanner {
return false; //unlikely.
}
private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
// If we have just switched replica, don't go to the next scanner yet. Rather, try
// the scanner operations on the new replica, from the right point in the scan
// Note that when we switched to a different replica we left it at a point
// where we just did the "openScanner" with the appropriate startrow
if (callable != null && callable.switchedToADifferentReplica()) return true;
return nextScanner(nbRows, done);
}
/*
* Gets a scanner for the next region. If this.currentRegion != null, then
* we will move to the endrow of this.currentRegion. Else we will get
@ -237,7 +293,7 @@ public class ClientScanner extends AbstractClientScanner {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
this.caller.callWithRetries(callable, scannerTimeout);
call(scan, callable, caller, scannerTimeout);
this.callable = null;
}
@ -274,7 +330,7 @@ public class ClientScanner extends AbstractClientScanner {
callable = getScannerCallable(localStartKey, nbRows);
// Open a scanner on the region server starting at the
// beginning of the region
this.caller.callWithRetries(callable, scannerTimeout);
call(scan, callable, caller, scannerTimeout);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
@ -286,15 +342,29 @@ public class ClientScanner extends AbstractClientScanner {
return true;
}
static Result[] call(Scan scan, ScannerCallableWithReplicas callable,
RpcRetryingCaller<Result[]> caller, int scannerTimeout)
throws IOException, RuntimeException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
return caller.callWithoutRetries(callable, scannerTimeout);
}
@InterfaceAudience.Private
protected ScannerCallable getScannerCallable(byte [] localStartKey,
protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
int nbRows) {
scan.setStartRow(localStartKey);
ScannerCallable s =
new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
this.rpcControllerFactory);
s.setCaching(nbRows);
return s;
ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
s, pool, primaryOperationTimeout, scan,
retries, scannerTimeout, caching, conf, caller);
return sr;
}
/**
@ -340,17 +410,43 @@ public class ClientScanner extends AbstractClientScanner {
// Skip only the first row (which was the last row of the last
// already-processed batch).
callable.setCaching(1);
values = this.caller.callWithRetries(callable, scannerTimeout);
values = call(scan, callable, caller, scannerTimeout);
// When the replica switch happens, we need to do certain operations
// again. The scannercallable will openScanner with the right startkey
// but we need to pick up from there. Bypass the rest of the loop
// and let the catch-up happen in the beginning of the loop as it
// happens for the cases where we see exceptions. Since only openScanner
// would have happened, values would be null
if (values == null && callable.switchedToADifferentReplica()) {
if (this.lastResult != null) { //only skip if there was something read earlier
skipFirst = true;
}
this.currentRegion = callable.getHRegionInfo();
continue;
}
callable.setCaching(this.caching);
skipFirst = false;
}
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = this.caller.callWithRetries(callable, scannerTimeout);
values = call(scan, callable, caller, scannerTimeout);
if (skipFirst && values != null && values.length == 1) {
skipFirst = false; // Already skipped, unset it before scanning again
values = this.caller.callWithRetries(callable, scannerTimeout);
values = call(scan, callable, caller, scannerTimeout);
}
// When the replica switch happens, we need to do certain operations
// again. The callable will openScanner with the right startkey
// but we need to pick up from there. Bypass the rest of the loop
// and let the catch-up happen in the beginning of the loop as it
// happens for the cases where we see exceptions. Since only openScanner
// would have happened, values would be null
if (values == null && callable.switchedToADifferentReplica()) {
if (this.lastResult != null) { //only skip if there was something read earlier
skipFirst = true;
}
this.currentRegion = callable.getHRegionInfo();
continue;
}
retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
@ -424,7 +520,8 @@ public class ClientScanner extends AbstractClientScanner {
}
}
// Values == null means server-side filter has determined we must STOP
} while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
} while (remainingResultSize > 0 && countdown > 0 &&
possiblyNextScanner(countdown, values == null));
}
if (cache.size() > 0) {
@ -442,7 +539,7 @@ public class ClientScanner extends AbstractClientScanner {
if (callable != null) {
callable.setClose();
try {
this.caller.callWithRetries(callable, scannerTimeout);
call(scan, callable, caller, scannerTimeout);
} catch (UnknownScannerException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to

View File

@ -29,9 +29,11 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
/**
* Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
@ -44,7 +46,7 @@ import java.io.IOException;
@InterfaceStability.Evolving
public class ClientSmallReversedScanner extends ReversedClientScanner {
private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
private RegionServerCallable<Result[]> smallScanCallable = null;
private ScannerCallableWithReplicas smallScanCallable = null;
private byte[] skipRowOfFirstResult = null;
/**
@ -58,10 +60,28 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
* @throws java.io.IOException
*/
public ClientSmallReversedScanner(Configuration conf, Scan scan, TableName tableName,
HConnection connection) throws IOException {
ClusterConnection connection) throws IOException {
super(conf, scan, tableName, connection);
}
/**
* Create a new ReversibleClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed changed.
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to rangeGet
* @param connection Connection identifying the cluster
* @param rpcFactory
* @throws IOException
*/
public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
ExecutorService pool, int primaryOperationTimeout) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout);
}
/**
* Gets a scanner for following scan. Move to next region or continue from the
* last result or start from the start row.
@ -109,7 +129,9 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
}
smallScanCallable = ClientSmallScanner.getSmallScanCallable(
scan, getConnection(), getTable(), localStartKey, cacheNum, this.rpcControllerFactory);
getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum,
rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
getRetries(), getScannerTimeout(), getConf(), caller);
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
this.scanMetrics.countOfRegions.incrementAndGet();
@ -135,7 +157,9 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = this.caller.callWithRetries(smallScanCallable, scannerTimeout);
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {

View File

@ -19,8 +19,8 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -29,9 +29,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -47,14 +47,14 @@ import com.google.protobuf.ServiceException;
* Client scanner for small scan. Generally, only one RPC is called to fetch the
* scan results, unless the results cross multiple regions or the row count of
* results excess the caching.
*
*
* For small scan, it will get better performance than {@link ClientScanner}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ClientSmallScanner extends ClientScanner {
private final Log LOG = LogFactory.getLog(this.getClass());
private RegionServerCallable<Result[]> smallScanCallable = null;
private ScannerCallableWithReplicas smallScanCallable = null;
// When fetching results from server, skip the first result if it has the same
// row with this one
private byte[] skipRowOfFirstResult = null;
@ -63,7 +63,7 @@ public class ClientSmallScanner extends ClientScanner {
* Create a new ClientSmallScanner for the specified table. An HConnection
* will be retrieved using the passed Configuration. Note that the passed
* {@link Scan} 's start row maybe changed.
*
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to rangeGet
@ -71,7 +71,7 @@ public class ClientSmallScanner extends ClientScanner {
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName) throws IOException {
this(conf, scan, tableName, HConnectionManager.getConnection(conf));
this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf));
}
/**
@ -85,7 +85,7 @@ public class ClientSmallScanner extends ClientScanner {
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, HConnection connection) throws IOException {
final TableName tableName, ClusterConnection connection) throws IOException {
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf),
new RpcControllerFactory(conf));
}
@ -95,15 +95,16 @@ public class ClientSmallScanner extends ClientScanner {
* {@link #ClientSmallScanner(Configuration, Scan, TableName, HConnection,
* RpcRetryingCallerFactory, RpcControllerFactory)} instead
*/
@Deprecated
public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
}
/**
* Create a new ShortClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed changed.
*
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to rangeGet
@ -111,9 +112,27 @@ public class ClientSmallScanner extends ClientScanner {
* @param rpcFactory
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
HConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory) throws IOException {
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory) throws IOException {
this(conf, scan, tableName, connection, rpcFactory, controllerFactory, null, 0);
}
/**
* Create a new ShortClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed changed.
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to rangeGet
* @param connection Connection identifying the cluster
* @param rpcFactory
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
ExecutorService pool, int primaryOperationTimeout) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory);
}
@ -166,34 +185,67 @@ public class ClientSmallScanner extends ClientScanner {
+ Bytes.toStringBinary(localStartKey) + "'");
}
smallScanCallable = getSmallScanCallable(
scan, getConnection(), getTable(), localStartKey, cacheNum, rpcControllerFactory);
getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum,
rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
getRetries(), getScannerTimeout(), getConf(), caller);
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
this.scanMetrics.countOfRegions.incrementAndGet();
}
return true;
}
static RegionServerCallable<Result[]> getSmallScanCallable(
final Scan sc, HConnection connection, TableName table,
byte[] localStartKey, final int cacheNum, final RpcControllerFactory rpcControllerFactory) {
sc.setStartRow(localStartKey);
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
connection, table, sc.getStartRow()) {
public Result[] call(int callTimeout) throws IOException {
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), sc, cacheNum, true);
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
ScanResponse response = getStub().scan(controller, request);
return ResponseConverter.getResults(controller.cellScanner(), response);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
static ScannerCallableWithReplicas getSmallScanCallable(
ClusterConnection connection, TableName table, Scan scan,
ScanMetrics scanMetrics, byte[] localStartKey, final int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result []> caller) {
scan.setStartRow(localStartKey);
SmallScannerCallable s = new SmallScannerCallable(
connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
ScannerCallableWithReplicas scannerCallableWithReplicas =
new ScannerCallableWithReplicas(table, connection,
s, pool, primaryOperationTimeout, scan, retries,
scannerTimeout, cacheNum, conf, caller);
return scannerCallableWithReplicas;
}
static class SmallScannerCallable extends ScannerCallable {
public SmallScannerCallable(
ClusterConnection connection, TableName table, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory controllerFactory, int caching, int id) {
super(connection, table, scan, scanMetrics, controllerFactory, id);
this.setCaching(caching);
}
@Override
public Result[] call(int timeout) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
};
return callable;
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), getScan(), getCaching(), true);
ScanResponse response = null;
PayloadCarryingRpcController controller = controllerFactory.newController();
try {
controller.setPriority(getTableName());
controller.setCallTimeout(timeout);
response = getStub().scan(controller, request);
return ResponseConverter.getResults(controller.cellScanner(),
response);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
@Override
public ScannerCallable getScannerCallableForReplica(int id) {
return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), scanMetrics,
controllerFactory, getCaching(), id);
}
@Override
public void setClose(){}
}
@Override
@ -214,7 +266,9 @@ public class ClientSmallScanner extends ClientScanner {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = this.caller.callWithRetries(smallScanCallable, scannerTimeout);
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {

View File

@ -34,8 +34,9 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
/** Internal methods on HConnection that should not be used by user code. */
@InterfaceAudience.Private
// NOTE: DO NOT make this class public. It was made package-private on purpose.
interface ClusterConnection extends HConnection {
// NOTE: Although this class is public, this class is meant to be used directly from internal
// classes and unit tests only.
public interface ClusterConnection extends HConnection {
/** @return - true if the master server is running */
@Override

View File

@ -1153,7 +1153,6 @@ class ConnectionManager {
s.setSmall(true);
s.setCaching(1);
HConnection connection = ConnectionManager.getConnectionInternal(conf);
int localNumRetries = (retry ? numTries : 1);
for (int tries = 0; true; tries++) {
@ -1178,7 +1177,7 @@ class ConnectionManager {
Result regionInfoRow = null;
ReversedClientScanner rcs = null;
try {
rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, connection);
rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this);
regionInfoRow = rcs.next();
} finally {
if (rcs != null) {

View File

@ -143,6 +143,7 @@ public class HTable implements HTableInterface {
private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG;
private int primaryCallTimeoutMicroSecond;
private int replicaCallTimeoutMicroSecondScan;
/** The Async process for puts with autoflush set to false or multiputs */
@ -281,10 +282,14 @@ public class HTable implements HTableInterface {
this.connection = ConnectionManager.getConnectionInternal(conf);
this.configuration = conf;
this.pool = pool;
if (pool == null) {
this.pool = getDefaultExecutor(conf);
this.cleanupPoolOnClose = true;
} else {
this.cleanupPoolOnClose = false;
}
this.tableName = tableName;
this.cleanupPoolOnClose = false;
this.cleanupConnectionOnClose = true;
this.finishSetup();
}
@ -331,10 +336,16 @@ public class HTable implements HTableInterface {
throw new IllegalArgumentException("Connection is null or closed.");
}
this.tableName = tableName;
this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
this.cleanupConnectionOnClose = false;
this.connection = connection;
this.configuration = connection.getConfiguration();
this.pool = pool;
if (pool == null) {
this.pool = getDefaultExecutor(this.configuration);
this.cleanupPoolOnClose = true;
} else {
this.cleanupPoolOnClose = false;
}
this.finishSetup();
}
@ -367,6 +378,8 @@ public class HTable implements HTableInterface {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.primaryCallTimeoutMicroSecond =
this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms
this.replicaCallTimeoutMicroSecondScan =
this.configuration.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@ -771,9 +784,10 @@ public class HTable implements HTableInterface {
return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
/**
* The underlying {@link HTable} must not be closed.
* {@link HTableInterface#getScanner(Scan)} has other usage details.
*/
@Override
public ResultScanner getScanner(final Scan scan) throws IOException {
if (scan.getCaching() <= 0) {
@ -783,24 +797,29 @@ public class HTable implements HTableInterface {
if (scan.isReversed()) {
if (scan.isSmall()) {
return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
this.connection);
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, replicaCallTimeoutMicroSecondScan);
} else {
return new ReversedClientScanner(getConfiguration(), scan, getName(),
this.connection);
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, replicaCallTimeoutMicroSecondScan);
}
}
if (scan.isSmall()) {
return new ClientSmallScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory);
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, replicaCallTimeoutMicroSecondScan);
} else {
return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
this.rpcCallerFactory, this.rpcControllerFactory);
this.rpcCallerFactory, this.rpcControllerFactory,
pool, replicaCallTimeoutMicroSecondScan);
}
}
/**
* {@inheritDoc}
* The underlying {@link HTable} must not be closed.
* {@link HTableInterface#getScanner(byte[])} has other usage details.
*/
@Override
public ResultScanner getScanner(byte [] family) throws IOException {
@ -810,7 +829,8 @@ public class HTable implements HTableInterface {
}
/**
* {@inheritDoc}
* The underlying {@link HTable} must not be closed.
* {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
*/
@Override
public ResultScanner getScanner(byte [] family, byte [] qualifier)
@ -1445,6 +1465,15 @@ public class HTable implements HTableInterface {
flushCommits();
if (cleanupPoolOnClose) {
this.pool.shutdown();
try {
boolean terminated = false;
do {
// wait until the pool has terminated
terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
} while (!terminated);
} catch (InterruptedException e) {
LOG.warn("waitForTermination interrupted");
}
}
if (cleanupConnectionOnClose) {
if (this.connection != null) {

View File

@ -55,7 +55,7 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
private final boolean cellBlock;
private RpcControllerFactory rpcFactory;
MultiServerCallable(final HConnection connection, final TableName tableName,
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
super(connection, tableName, null);
this.rpcFactory = rpcFactory;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -28,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ExceptionUtil;
@ -51,17 +53,37 @@ public class ReversedClientScanner extends ClientScanner {
* @throws IOException
*/
public ReversedClientScanner(Configuration conf, Scan scan,
TableName tableName, HConnection connection) throws IOException {
TableName tableName, ClusterConnection connection) throws IOException {
super(conf, scan, tableName, connection);
}
/**
* Create a new ReversibleClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed.
* @param conf
* @param scan
* @param tableName
* @param connection
* @param pool
* @param primaryOperationTimeout
* @throws IOException
*/
public ReversedClientScanner(Configuration conf, Scan scan,
TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
ExecutorService pool, int primaryOperationTimeout) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout);
}
@Override
protected boolean nextScanner(int nbRows, final boolean done)
throws IOException {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
this.caller.callWithRetries(callable, scannerTimeout);
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
this.caller.callWithoutRetries(callable, scannerTimeout);
this.callable = null;
}
@ -109,7 +131,9 @@ public class ReversedClientScanner extends ClientScanner {
callable = getScannerCallable(localStartKey, nbRows, locateStartRow);
// Open a scanner on the region server starting at the
// beginning of the region
this.caller.callWithRetries(callable, scannerTimeout);
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
this.caller.callWithoutRetries(callable, scannerTimeout);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
@ -121,15 +145,18 @@ public class ReversedClientScanner extends ClientScanner {
}
return true;
}
protected ScannerCallable getScannerCallable(byte[] localStartKey,
protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey,
int nbRows, byte[] locateStartRow) {
scan.setStartRow(localStartKey);
ScannerCallable s =
new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
locateStartRow, this.rpcControllerFactory);
s.setCaching(nbRows);
return s;
ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(),
s, pool, primaryOperationTimeout, scan,
getRetries(), getScannerTimeout(), caching, getConf(), caller);
return sr;
}
@Override
@ -170,5 +197,4 @@ public class ReversedClientScanner extends ClientScanner {
return closestFrontRow;
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
@ -27,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -55,18 +57,33 @@ public class ReversedScannerCallable extends ScannerCallable {
* @param locateStartRow The start row for locating regions
* @param rpcFactory to create an {@link RpcController} to talk to the regionserver
*/
public ReversedScannerCallable(HConnection connection, TableName tableName, Scan scan,
public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) {
super(connection, tableName, scan, scanMetrics, rpcFactory);
this.locateStartRow = locateStartRow;
}
/**
* @param connection
* @param tableName
* @param scan
* @param scanMetrics
* @param locateStartRow The start row for locating regions
* @param rpcFactory to create an {@link RpcController} to talk to the regionserver
* @param replicaId the replica id
*/
public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory, int replicaId) {
super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
this.locateStartRow = locateStartRow;
}
/**
* @deprecated use
* {@link #ReversedScannerCallable(HConnection, TableName, Scan, ScanMetrics, byte[], RpcControllerFactory)}
*/
@Deprecated
public ReversedScannerCallable(HConnection connection, TableName tableName,
public ReversedScannerCallable(ClusterConnection connection, TableName tableName,
Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory
.instantiate(connection.getConfiguration()));
@ -78,10 +95,15 @@ public class ReversedScannerCallable extends ScannerCallable {
*/
@Override
public void prepare(boolean reload) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (!instantiated || reload) {
if (locateStartRow == null) {
// Just locate the region with the row
this.location = connection.getRegionLocation(tableName, row, reload);
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id,
getConnection(), tableName, row);
this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (this.location == null) {
throw new IOException("Failed to find location, tableName="
+ tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
@ -137,9 +159,10 @@ public class ReversedScannerCallable extends ScannerCallable {
List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
byte[] currentKey = startKey;
do {
HRegionLocation regionLocation = connection.getRegionLocation(tableName,
currentKey, reload);
if (regionLocation.getRegionInfo().containsRow(currentKey)) {
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id,
getConnection(), tableName, currentKey);
HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null;
if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) {
regionList.add(regionLocation);
} else {
throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
@ -152,4 +175,11 @@ public class ReversedScannerCallable extends ScannerCallable {
return regionList;
}
@Override
public ScannerCallable getScannerCallableForReplica(int id) {
ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName,
this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id);
r.setCaching(this.getCaching());
return r;
}
}

View File

@ -114,7 +114,7 @@ public class RpcRetryingCallerWithReadReplicas {
}
if (reload || location == null) {
RegionLocations rl = getRegionLocations(false, id);
RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
location = id < rl.size() ? rl.getRegionLocation(id) : null;
}
@ -189,7 +189,8 @@ public class RpcRetryingCallerWithReadReplicas {
*/
public synchronized Result call()
throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID,
cConnection, tableName, get.getRow());
BoundedCompletionService<Result> cs = new BoundedCompletionService<Result>(pool, rl.size());
List<ExecutionException> exceptions = null;
@ -241,7 +242,7 @@ public class RpcRetryingCallerWithReadReplicas {
}
if (exceptions != null && !exceptions.isEmpty()) {
throwEnrichedException(exceptions.get(0)); // just rethrow the first exception for now.
throwEnrichedException(exceptions.get(0), retries, toString()); // just rethrow the first exception for now.
}
return null; // unreachable
}
@ -250,7 +251,7 @@ public class RpcRetryingCallerWithReadReplicas {
* Extract the real exception from the ExecutionException, and throws what makes more
* sense.
*/
private void throwEnrichedException(ExecutionException e)
static void throwEnrichedException(ExecutionException e, int retries, String str)
throws RetriesExhaustedException, DoNotRetryIOException {
Throwable t = e.getCause();
assert t != null; // That's what ExecutionException is about: holding an exception
@ -265,7 +266,7 @@ public class RpcRetryingCallerWithReadReplicas {
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), toString());
EnvironmentEdgeManager.currentTimeMillis(), str);
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
Collections.singletonList(qt);
@ -293,11 +294,12 @@ public class RpcRetryingCallerWithReadReplicas {
return max - min + 1;
}
private RegionLocations getRegionLocations(boolean useCache, int replicaId)
static RegionLocations getRegionLocations(boolean useCache, int replicaId,
ClusterConnection cConnection, TableName tableName, byte[] row)
throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;
try {
rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true, replicaId);
rl = cConnection.locateRegion(tableName, row, useCache, true, replicaId);
} catch (DoNotRetryIOException e) {
throw e;
} catch (RetriesExhaustedException e) {

View File

@ -136,7 +136,7 @@ public class Scan extends Query {
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
private Boolean loadColumnFamiliesOnDemand = null;
private Consistency consistency = null;
private Consistency consistency = Consistency.STRONG;
/**
* Set it true for small scan to get better performance

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
@ -28,11 +29,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
@ -63,15 +67,17 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
private long scannerId = -1L;
protected long scannerId = -1L;
protected boolean instantiated = false;
private boolean closed = false;
protected boolean closed = false;
private Scan scan;
private int caching = 1;
protected final ClusterConnection cConnection;
protected ScanMetrics scanMetrics;
private boolean logScannerActivity = false;
private int logCutOffLatency = 1000;
private static String myAddress;
protected final int id;
static {
try {
myAddress = DNS.getDefaultHost("default", "default");
@ -83,8 +89,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
// indicate if it is a remote server call
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
private RpcControllerFactory rpcFactory;
protected RpcControllerFactory controllerFactory;
/**
* @param connection which connection
* @param tableName table callable is on
@ -93,26 +99,29 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
* metrics
* @param rpcControllerFactory factory to use when creating {@link RpcController}
*/
public ScannerCallable (HConnection connection, TableName tableName, Scan scan,
public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
}
/**
*
* @param connection
* @param tableName
* @param scan
* @param scanMetrics
* @param id the replicaId
*/
public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
super(connection, tableName, scan.getStartRow());
this.id = id;
this.cConnection = connection;
this.scan = scan;
this.scanMetrics = scanMetrics;
Configuration conf = connection.getConfiguration();
logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
this.rpcFactory = rpcControllerFactory;
}
/**
* @deprecated Use {@link #ScannerCallable(HConnection, TableName, Scan, ScanMetrics,
* RpcControllerFactory)}
*/
@Deprecated
public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan,
ScanMetrics scanMetrics) {
this(connection, TableName.valueOf(tableName), scan, scanMetrics, RpcControllerFactory
.instantiate(connection.getConfiguration()));
this.controllerFactory = rpcControllerFactory;
}
/**
@ -121,8 +130,20 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
*/
@Override
public void prepare(boolean reload) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
id, getConnection(), getTableName(), getRow());
location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (location == null || location.getServerName() == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);
}
ServerName dest = location.getServerName();
setStub(super.getConnection().getClient(dest));
if (!instantiated || reload) {
super.prepare(reload);
checkIfRegionServerIsRemote();
instantiated = true;
}
@ -154,6 +175,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
@Override
@SuppressWarnings("deprecation")
public Result [] call(int callTimeout) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (closed) {
if (scannerId != -1) {
close();
@ -168,7 +192,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
ScanResponse response = null;
PayloadCarryingRpcController controller = rpcFactory.newController();
PayloadCarryingRpcController controller = controllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
@ -332,6 +356,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
/**
* @return the HRegionInfo for the current region
*/
@Override
public HRegionInfo getHRegionInfo() {
if (!instantiated) {
return null;
@ -347,6 +372,11 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
return caching;
}
@Override
public ClusterConnection getConnection() {
return cConnection;
}
/**
* Set the number of rows that will be fetched on next
* @param caching the number of rows for caching
@ -354,4 +384,11 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
public void setCaching(int caching) {
this.caching = caching;
}
public ScannerCallable getScannerCallableForReplica(int id) {
ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
this.getScan(), this.scanMetrics, controllerFactory, id);
s.setCaching(this.caching);
return s;
}
}

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.BoundedCompletionService;
import org.apache.hadoop.hbase.util.Pair;
/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region
* 2. The scanner sends all the RPCs to the default region until it is done, or, there
* is a timeout on the default (a timeout of zero is disallowed).
* 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
* 4. The results from the first successful scanner are taken, and it is stored which server
* returned the results.
* 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
* in which case, the other replicas are queried (as in (3) above).
*
*/
@InterfaceAudience.Private
class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private final Log LOG = LogFactory.getLog(this.getClass());
volatile ScannerCallable currentScannerCallable;
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
final ClusterConnection cConnection;
protected final ExecutorService pool;
protected final int timeBeforeReplicas;
private final Scan scan;
private final int retries;
private Result lastResult;
private final RpcRetryingCaller<Result[]> caller;
private final TableName tableName;
private Configuration conf;
private int scannerTimeout;
private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
public ScannerCallableWithReplicas (TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result []> caller) {
this.currentScannerCallable = baseCallable;
this.cConnection = cConnection;
this.pool = pool;
if (timeBeforeReplicas < 0) {
throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
}
this.timeBeforeReplicas = timeBeforeReplicas;
this.scan = scan;
this.retries = retries;
this.tableName = tableName;
this.conf = conf;
this.scannerTimeout = scannerTimeout;
this.caller = caller;
}
public void setClose() {
currentScannerCallable.setClose();
}
public void setCaching(int caching) {
currentScannerCallable.setCaching(caching);
}
public int getCaching() {
return currentScannerCallable.getCaching();
}
public HRegionInfo getHRegionInfo() {
return currentScannerCallable.getHRegionInfo();
}
@Override
public Result [] call(int timeout) throws IOException {
// If the active replica callable was closed somewhere, invoke the RPC to
// really close it. In the case of regular scanners, this applies. We make couple
// of RPCs to a RegionServer, and when that region is exhausted, we set
// the closed flag. Then an RPC is required to actually close the scanner.
if (currentScannerCallable != null && currentScannerCallable.closed) {
// For closing we target that exact scanner (and not do replica fallback like in
// the case of normal reads)
if (LOG.isDebugEnabled()) {
LOG.debug("Closing scanner " + currentScannerCallable.scannerId);
}
Result[] r = currentScannerCallable.call(timeout);
currentScannerCallable = null;
return r;
}
// We need to do the following:
//1. When a scan goes out to a certain replica (default or not), we need to
// continue to hit that until there is a failure. So store the last successfully invoked
// replica
//2. We should close the "losing" scanners (scanners other than the ones we hear back
// from first)
//
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
currentScannerCallable.getRow());
// allocate a boundedcompletion pool of some multiple of number of replicas.
// We want accommodate the "scan" RPC call and the "close" RPC call (we schedule "close"
// RPCs for unneeded replica scans using the same pool)
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5);
List<ExecutionException> exceptions = null;
int submitted = 0, completed = 0;
AtomicBoolean done = new AtomicBoolean(false);
replicaSwitched.set(false);
// submit call for the primary replica.
submitted += addCallsForCurrentReplica(cs, rl);
try {
// wait for the timeout to see whether the primary responds back
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs);
}
return r == null ? null : r.getFirst(); //great we got a response
}
} catch (ExecutionException e) {
// the primary call failed with RetriesExhaustedException or DoNotRetryIOException
// but the secondaries might still succeed. Continue on the replica RPCs.
exceptions = new ArrayList<ExecutionException>(rl.size());
exceptions.add(e);
completed++;
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
// submit call for the all of the secondaries at once
// TODO: this may be an overkill for large region replication
submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
try {
while (completed < submitted) {
try {
Future<Pair<Result[], ScannerCallable>> f = cs.take();
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs);
}
return r == null ? null : r.getFirst(); // great we got an answer
} catch (ExecutionException e) {
// if not cancel or interrupt, wait until all RPC's are done
// one of the tasks failed. Save the exception for later.
if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
exceptions.add(e);
completed++;
}
}
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.
cs.cancelAll(true);
}
if (exceptions != null && !exceptions.isEmpty()) {
RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0),
retries, toString()); // just rethrow the first exception for now.
}
return null; // unreachable
}
private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
AtomicBoolean done, BoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
if (done.compareAndSet(false, true)) {
if (currentScannerCallable != scanner) replicaSwitched.set(true);
currentScannerCallable = scanner;
// store where to start the replica scanner from if we need to.
if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
if (LOG.isDebugEnabled()) {
LOG.debug("Setting current scanner as " + currentScannerCallable.scannerId +
" associated with " + currentScannerCallable.getHRegionInfo().getReplicaId());
}
// close all outstanding replica scanners but the one we heard back from
outstandingCallables.remove(scanner);
for (ScannerCallable s : outstandingCallables) {
if (LOG.isDebugEnabled()) {
LOG.debug("Closing scanner " + s.scannerId +
" because this was slow and another replica succeeded");
}
// Submit the "close" to the pool since this might take time, and we don't
// want to wait for the "close" to happen yet. The "wait" will happen when
// the table is closed (when the awaitTermination of the underlying pool is called)
s.setClose();
RetryingRPC r = new RetryingRPC(s);
cs.submit(r);
}
// now clear outstandingCallables since we scheduled a close for all the contained scanners
outstandingCallables.clear();
}
}
/**
* When a scanner switches in the middle of scanning (the 'next' call fails
* for example), the upper layer {@link ClientScanner} needs to know
* @return
*/
public boolean switchedToADifferentReplica() {
return replicaSwitched.get();
}
private int addCallsForCurrentReplica(
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica);
return 1;
}
private int addCallsForOtherReplicas(
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, int min,
int max) {
if (scan.getConsistency() == Consistency.STRONG) {
return 0; // not scheduling on other replicas for strong consistency
}
for (int id = min; id <= max; id++) {
if (currentScannerCallable.getHRegionInfo().getReplicaId() == id) {
continue; //this was already scheduled earlier
}
ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
if (this.lastResult != null) {
s.getScan().setStartRow(this.lastResult.getRow());
}
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica);
}
return max - min + 1;
}
class RetryingRPC implements Callable<Pair<Result[], ScannerCallable>> {
final ScannerCallable callable;
RetryingRPC(ScannerCallable callable) {
this.callable = callable;
}
@Override
public Pair<Result[], ScannerCallable> call() throws IOException {
// For the Consistency.STRONG (default case), we reuse the caller
// to keep compatibility with what is done in the past
// For the Consistency.TIMELINE case, we can't reuse the caller
// since we could be making parallel RPCs (caller.callWithRetries is synchronized
// and we can't invoke it multiple times at the same time)
RpcRetryingCaller<Result[]> caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) {
caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
<Result[]>newCaller();
}
Result[] res = caller.callWithRetries(callable, scannerTimeout);
return new Pair<Result[], ScannerCallable>(res, callable);
}
}
@Override
public void prepare(boolean reload) throws IOException {
}
@Override
public void throwable(Throwable t, boolean retrying) {
currentScannerCallable.throwable(t, retrying);
}
@Override
public String getExceptionMessageAdditionalDetail() {
return currentScannerCallable.getExceptionMessageAdditionalDetail();
}
@Override
public long sleep(long pause, int tries) {
return currentScannerCallable.sleep(pause, tries);
}
}

View File

@ -898,6 +898,9 @@ public final class ProtobufUtil {
if (scan.isReversed()) {
scanBuilder.setReversed(scan.isReversed());
}
if (scan.getConsistency() == Consistency.TIMELINE) {
scanBuilder.setConsistency(toConsistency(scan.getConsistency()));
}
return scanBuilder.build();
}
@ -977,6 +980,9 @@ public final class ProtobufUtil {
if (proto.hasReversed()) {
scan.setReversed(proto.getReversed());
}
if (proto.hasConsistency()) {
scan.setConsistency(toConsistency(proto.getConsistency()));
}
return scan;
}

View File

@ -370,7 +370,7 @@ public final class ResponseConverter {
}
cells.add(cellScanner.current());
}
results[i] = Result.create(cells);
results[i] = Result.create(cells, null, response.getStale());
} else {
// Result is pure pb.
results[i] = ProtobufUtil.toResult(response.getResults(i));

View File

@ -17404,6 +17404,16 @@ public final class ClientProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrBuilder getResultsOrBuilder(
int index);
// optional bool stale = 6;
/**
* <code>optional bool stale = 6;</code>
*/
boolean hasStale();
/**
* <code>optional bool stale = 6;</code>
*/
boolean getStale();
}
/**
* Protobuf type {@code ScanResponse}
@ -17506,6 +17516,11 @@ public final class ClientProtos {
results_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result.PARSER, extensionRegistry));
break;
}
case 48: {
bitField0_ |= 0x00000008;
stale_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -17719,12 +17734,29 @@ public final class ClientProtos {
return results_.get(index);
}
// optional bool stale = 6;
public static final int STALE_FIELD_NUMBER = 6;
private boolean stale_;
/**
* <code>optional bool stale = 6;</code>
*/
public boolean hasStale() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional bool stale = 6;</code>
*/
public boolean getStale() {
return stale_;
}
private void initFields() {
cellsPerResult_ = java.util.Collections.emptyList();
scannerId_ = 0L;
moreResults_ = false;
ttl_ = 0;
results_ = java.util.Collections.emptyList();
stale_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -17753,6 +17785,9 @@ public final class ClientProtos {
for (int i = 0; i < results_.size(); i++) {
output.writeMessage(5, results_.get(i));
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBool(6, stale_);
}
getUnknownFields().writeTo(output);
}
@ -17787,6 +17822,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, results_.get(i));
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(6, stale_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -17829,6 +17868,11 @@ public final class ClientProtos {
}
result = result && getResultsList()
.equals(other.getResultsList());
result = result && (hasStale() == other.hasStale());
if (hasStale()) {
result = result && (getStale()
== other.getStale());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -17862,6 +17906,10 @@ public final class ClientProtos {
hash = (37 * hash) + RESULTS_FIELD_NUMBER;
hash = (53 * hash) + getResultsList().hashCode();
}
if (hasStale()) {
hash = (37 * hash) + STALE_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getStale());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -17992,6 +18040,8 @@ public final class ClientProtos {
} else {
resultsBuilder_.clear();
}
stale_ = false;
bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@ -18046,6 +18096,10 @@ public final class ClientProtos {
} else {
result.results_ = resultsBuilder_.build();
}
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
to_bitField0_ |= 0x00000008;
}
result.stale_ = stale_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -18107,6 +18161,9 @@ public final class ClientProtos {
}
}
}
if (other.hasStale()) {
setStale(other.getStale());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -18717,6 +18774,39 @@ public final class ClientProtos {
return resultsBuilder_;
}
// optional bool stale = 6;
private boolean stale_ ;
/**
* <code>optional bool stale = 6;</code>
*/
public boolean hasStale() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional bool stale = 6;</code>
*/
public boolean getStale() {
return stale_;
}
/**
* <code>optional bool stale = 6;</code>
*/
public Builder setStale(boolean value) {
bitField0_ |= 0x00000020;
stale_ = value;
onChanged();
return this;
}
/**
* <code>optional bool stale = 6;</code>
*/
public Builder clearStale() {
bitField0_ = (bitField0_ & ~0x00000020);
stale_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:ScanResponse)
}
@ -30568,50 +30658,50 @@ public final class ClientProtos {
"Specifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscanne",
"r_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rcl" +
"ose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004" +
"\"y\n\014ScanResponse\022\030\n\020cells_per_result\030\001 \003" +
"(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003" +
" \001(\010\022\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Res" +
"ult\"\263\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001" +
" \002(\0132\020.RegionSpecifier\0225\n\013family_path\030\002 " +
"\003(\0132 .BulkLoadHFileRequest.FamilyPath\022\026\n" +
"\016assign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006f" +
"amily\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFi",
"leResponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026Coprocesso" +
"rServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_nam" +
"e\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030" +
"\004 \002(\014\"9\n\030CoprocessorServiceResult\022\035\n\005val" +
"ue\030\001 \001(\0132\016.NameBytesPair\"d\n\031CoprocessorS" +
"erviceRequest\022 \n\006region\030\001 \002(\0132\020.RegionSp" +
"ecifier\022%\n\004call\030\002 \002(\0132\027.CoprocessorServi" +
"ceCall\"]\n\032CoprocessorServiceResponse\022 \n\006" +
"region\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030" +
"\002 \002(\0132\016.NameBytesPair\"{\n\006Action\022\r\n\005index",
"\030\001 \001(\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto" +
"\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(" +
"\0132\027.CoprocessorServiceCall\"Y\n\014RegionActi" +
"on\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006" +
"atomic\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"\221\001" +
"\n\021ResultOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006re" +
"sult\030\002 \001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016" +
".NameBytesPair\0221\n\016service_result\030\004 \001(\0132\031" +
".CoprocessorServiceResult\"f\n\022RegionActio" +
"nResult\022-\n\021resultOrException\030\001 \003(\0132\022.Res",
"ultOrException\022!\n\texception\030\002 \001(\0132\016.Name" +
"BytesPair\"G\n\014MultiRequest\022#\n\014regionActio" +
"n\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup\030\002 \001" +
"(\004\"@\n\rMultiResponse\022/\n\022regionActionResul" +
"t\030\001 \003(\0132\023.RegionActionResult*\'\n\013Consiste" +
"ncy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\261\002\n\rClient" +
"Service\022 \n\003Get\022\013.GetRequest\032\014.GetRespons" +
"e\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateResp" +
"onse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRespons" +
"e\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReques",
"t\032\026.BulkLoadHFileResponse\022F\n\013ExecService" +
"\022\032.CoprocessorServiceRequest\032\033.Coprocess" +
"orServiceResponse\022&\n\005Multi\022\r.MultiReques" +
"t\032\016.MultiResponseBB\n*org.apache.hadoop.h" +
"base.protobuf.generatedB\014ClientProtosH\001\210" +
"\001\001\240\001\001"
"\"\210\001\n\014ScanResponse\022\030\n\020cells_per_result\030\001 " +
"\003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030" +
"\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Re" +
"sult\022\r\n\005stale\030\006 \001(\010\"\263\001\n\024BulkLoadHFileReq" +
"uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\0225" +
"\n\013family_path\030\002 \003(\0132 .BulkLoadHFileReque" +
"st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n" +
"\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(",
"\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002" +
"(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(" +
"\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003" +
" \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorServ" +
"iceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPair" +
"\"d\n\031CoprocessorServiceRequest\022 \n\006region\030" +
"\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027." +
"CoprocessorServiceCall\"]\n\032CoprocessorSer" +
"viceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSpe" +
"cifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{\n",
"\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(\013" +
"2\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014s" +
"ervice_call\030\004 \001(\0132\027.CoprocessorServiceCa" +
"ll\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Reg" +
"ionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030\003" +
" \003(\0132\007.Action\"\221\001\n\021ResultOrException\022\r\n\005i" +
"ndex\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\te" +
"xception\030\003 \001(\0132\016.NameBytesPair\0221\n\016servic" +
"e_result\030\004 \001(\0132\031.CoprocessorServiceResul" +
"t\"f\n\022RegionActionResult\022-\n\021resultOrExcep",
"tion\030\001 \003(\0132\022.ResultOrException\022!\n\texcept" +
"ion\030\002 \001(\0132\016.NameBytesPair\"G\n\014MultiReques" +
"t\022#\n\014regionAction\030\001 \003(\0132\r.RegionAction\022\022" +
"\n\nnonceGroup\030\002 \001(\004\"@\n\rMultiResponse\022/\n\022r" +
"egionActionResult\030\001 \003(\0132\023.RegionActionRe" +
"sult*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMEL" +
"INE\020\0012\261\002\n\rClientService\022 \n\003Get\022\013.GetRequ" +
"est\032\014.GetResponse\022)\n\006Mutate\022\016.MutateRequ" +
"est\032\017.MutateResponse\022#\n\004Scan\022\014.ScanReque" +
"st\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.Bul",
"kLoadHFileRequest\032\026.BulkLoadHFileRespons" +
"e\022F\n\013ExecService\022\032.CoprocessorServiceReq" +
"uest\032\033.CoprocessorServiceResponse\022&\n\005Mul" +
"ti\022\r.MultiRequest\032\016.MultiResponseBB\n*org" +
".apache.hadoop.hbase.protobuf.generatedB" +
"\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -30713,7 +30803,7 @@ public final class ClientProtos {
internal_static_ScanResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanResponse_descriptor,
new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", });
new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", });
internal_static_BulkLoadHFileRequest_descriptor =
getDescriptor().getMessageTypes().get(14);
internal_static_BulkLoadHFileRequest_fieldAccessorTable = new

View File

@ -289,6 +289,7 @@ message ScanResponse {
// This field is mutually exclusive with cells_per_result (since the Cells will
// be inside the pb'd Result)
repeated Result results = 5;
optional bool stale = 6;
}
/**

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
@ -323,7 +324,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private void addResults(final ScanResponse.Builder builder, final List<Result> results,
final RpcController controller) {
final RpcController controller, boolean isDefaultRegion) {
builder.setStale(!isDefaultRegion);
if (results == null || results.isEmpty()) return;
if (isClientCellBlockSupport()) {
for (Result res : results) {
@ -1942,6 +1944,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
int i = 0;
synchronized(scanner) {
boolean stale = (region.getRegionInfo().getReplicaId() != 0);
for (; i < rows
&& currentScanResultSize < maxResultSize; ) {
// Collect values to be returned here
@ -1952,7 +1955,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
}
}
results.add(Result.create(values));
results.add(Result.create(values, null, stale));
i++;
}
if (!moreRows) {
@ -1979,7 +1982,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
moreResults = false;
results = null;
} else {
addResults(builder, results, controller);
addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
}
} finally {
// We're done. On way out re-add the above removed lease.

View File

@ -241,7 +241,7 @@ public class TestMultiVersions {
}
assertTrue(cellCount == 1);
}
table.close();
table.flushCommits();
}
// Case 1: scan with LATEST_TIMESTAMP. Should get two rows

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
@ -186,19 +188,20 @@ public class TestMetaReaderEditorNoCluster {
// to shove this in here first so it gets picked up all over; e.g. by
// HTable.
connection = HConnectionTestingUtility.getSpiedConnection(UTIL.getConfiguration());
// Fix the location lookup so it 'works' though no network. First
// make an 'any location' object.
final HRegionLocation anyLocation =
new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, sn);
// Return the any location object when locateRegion is called in HTable
// constructor and when its called by ServerCallable (it uses getRegionLocation).
final RegionLocations rl = new RegionLocations(anyLocation);
// Return the RegionLocations object when locateRegion
// The ugly format below comes of 'Important gotcha on spying real objects!' from
// http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html
Mockito.doReturn(anyLocation).
when(connection).locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any());
Mockito.doReturn(anyLocation).
when(connection).getRegionLocation((TableName) Mockito.any(),
(byte[]) Mockito.any(), Mockito.anyBoolean());
ClusterConnection cConnection =
HConnectionTestingUtility.getSpiedClusterConnection(UTIL.getConfiguration());
Mockito.doReturn(rl).when
(cConnection).locateRegion((TableName)Mockito.any(), (byte[])Mockito.any(),
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt());
// Now shove our HRI implementation into the spied-upon connection.
Mockito.doReturn(implementation).

View File

@ -160,6 +160,20 @@ public class HConnectionTestingUtility {
}
}
public static ClusterConnection getSpiedClusterConnection(final Configuration conf)
throws IOException {
HConnectionKey connectionKey = new HConnectionKey(conf);
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
HConnectionImplementation connection =
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
if (connection == null) {
connection = Mockito.spy(new HConnectionImplementation(conf, true));
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
}
return connection;
}
}
/**
* @return Count of extant connection instances
*/

View File

@ -37,11 +37,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
@ -53,9 +54,14 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -84,16 +90,44 @@ public class TestReplicasClient {
*/
public static class SlowMeCopro extends BaseRegionObserver {
static final AtomicLong sleepTime = new AtomicLong(0);
static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
static final AtomicInteger countOfNext = new AtomicInteger(0);
static final AtomicReference<CountDownLatch> cdl =
new AtomicReference<CountDownLatch>(new CountDownLatch(0));
Random r = new Random();
public SlowMeCopro() {
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
slowdownCode(e);
}
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final RegionScanner s) throws IOException {
slowdownCode(e);
return s;
}
@Override
public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
final InternalScanner s, final List<Result> results,
final int limit, final boolean hasMore) throws IOException {
//this will slow down a certain next operation if the conditions are met. The slowness
//will allow the call to go to a replica
if (slowDownNext.get()) {
//have some "next" return successfully from the primary; hence countOfNext checked
if (countOfNext.incrementAndGet() == 2) {
sleepTime.set(2000);
slowdownCode(e);
}
}
return true;
}
private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
CountDownLatch latch = cdl.get();
try {
@ -121,7 +155,7 @@ public class TestReplicasClient {
// enable store file refreshing
HTU.getConfiguration().setInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
HTU.startMiniCluster(NB_SERVERS);
// Create table then get the single region for our new table.
@ -161,6 +195,14 @@ public class TestReplicasClient {
@Before
public void before() throws IOException {
HTU.getHBaseAdmin().getConnection().clearRegionCache();
try {
openRegion(hriPrimary);
} catch (Exception ignored) {
}
try {
openRegion(hriSecondary);
} catch (Exception ignored) {
}
}
@After
@ -169,6 +211,10 @@ public class TestReplicasClient {
closeRegion(hriSecondary);
} catch (Exception ignored) {
}
try {
closeRegion(hriPrimary);
} catch (Exception ignored) {
}
ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriSecondary);
@ -180,6 +226,9 @@ public class TestReplicasClient {
}
private void openRegion(HRegionInfo hri) throws Exception {
try {
if (isRegionOpened(hri)) return;
} catch (Exception e){}
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
// first version is '0'
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
@ -215,6 +264,10 @@ public class TestReplicasClient {
ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null));
}
private boolean isRegionOpened(HRegionInfo hri) throws Exception {
return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
}
private void checkRegionIsClosed(String encodedRegionName) throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
@ -475,4 +528,106 @@ public class TestReplicasClient {
closeRegion(hriSecondary);
}
}
}
@Test
public void testScanWithReplicas() throws Exception {
//simple scan
runMultipleScansOfOneType(false, false);
}
@Test
public void testSmallScanWithReplicas() throws Exception {
//small scan
runMultipleScansOfOneType(false, true);
}
@Test
public void testReverseScanWithReplicas() throws Exception {
//reverse scan
runMultipleScansOfOneType(true, false);
}
private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
openRegion(hriSecondary);
int NUMROWS = 100;
try {
for (int i = 0; i < NUMROWS; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
Put p = new Put(b1);
p.add(f, b1, b1);
table.put(p);
}
LOG.debug("PUT done");
int caching = 20;
byte[] start;
if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1));
else start = Bytes.toBytes("testUseRegionWithReplica" + 0);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, start, NUMROWS, false, false);
//Even if we were to slow the server down, unless we ask for stale
//we won't get it
SlowMeCopro.sleepTime.set(5000);
scanWithReplicas(reversed, small, Consistency.STRONG, caching, start, NUMROWS, false, false);
SlowMeCopro.sleepTime.set(0);
HTU.getHBaseAdmin().flush(table.getTableName());
LOG.info("flush done");
Thread.sleep(1000 + REFRESH_PERIOD * 2);
//Now set the flag to get a response even if stale
SlowMeCopro.sleepTime.set(5000);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, start, NUMROWS, true, false);
SlowMeCopro.sleepTime.set(0);
// now make some 'next' calls slow
SlowMeCopro.slowDownNext.set(true);
SlowMeCopro.countOfNext.set(0);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, start, NUMROWS, true, true);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
for (int i = 0; i < NUMROWS; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
Delete d = new Delete(b1);
table.delete(d);
}
closeRegion(hriSecondary);
}
}
private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
int caching, byte[] startRow, int numRows, boolean staleExpected, boolean slowNext)
throws Exception {
Scan scan = new Scan(startRow);
scan.setCaching(caching);
scan.setReversed(reversed);
scan.setSmall(small);
scan.setConsistency(consistency);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iter = scanner.iterator();
HashMap<String, Boolean> map = new HashMap<String, Boolean>();
int count = 0;
int countOfStale = 0;
while (iter.hasNext()) {
count++;
Result r = iter.next();
if (map.containsKey(new String(r.getRow()))) {
throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
}
map.put(new String(r.getRow()), true);
if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected);
if (r.isStale()) countOfStale++;
}
LOG.debug("Count of rows " + count + " num rows expected " + numRows);
Assert.assertTrue(count == numRows);
if (slowNext) {
LOG.debug("Count of Stale " + countOfStale);
Assert.assertTrue(countOfStale > 1 && countOfStale < numRows);
}
}
}

View File

@ -189,7 +189,6 @@ public class TestRestoreSnapshotFromClient {
assertEquals(500, TEST_UTIL.countRows(table, TEST_FAMILY2));
Set<String> fsFamilies = getFamiliesFromFS(tableName);
assertEquals(2, fsFamilies.size());
table.close();
// Take a snapshot
admin.disableTable(tableName);
@ -210,7 +209,6 @@ public class TestRestoreSnapshotFromClient {
assertEquals(1, htd.getFamilies().size());
fsFamilies = getFamiliesFromFS(tableName);
assertEquals(1, fsFamilies.size());
table.close();
// Restore back the snapshot (with the cf)
admin.disableTable(tableName);

View File

@ -879,7 +879,7 @@ public class TestVisibilityLabels {
table.put(puts);
} finally {
if (table != null) {
table.close();
table.flushCommits();
}
}
return table;