HBASE-11214. Fixes for scans on a replicated table

This commit is contained in:
Devaraj Das 2014-05-22 10:01:50 -07:00 committed by Enis Soztutar
parent 5a8f3f7cef
commit 1daf8acfc9
9 changed files with 39 additions and 182 deletions

View File

@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -49,8 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* If there are multiple regions in a table, this scanner will iterate
* through them all.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@InterfaceAudience.Private
public class ClientScanner extends AbstractClientScanner {
private final Log LOG = LogFactory.getLog(this.getClass());
protected Scan scan;
@ -81,70 +79,6 @@ public class ClientScanner extends AbstractClientScanner {
private int retries;
protected final ExecutorService pool;
/**
* Create a new ClientScanner for the specified table. A ClusterConnection will be
* retrieved using the passed Configuration.
* Note that the passed {@link Scan}'s start row maybe changed.
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to scan
* @throws IOException
*/
public ClientScanner(final Configuration conf, final Scan scan,
final TableName tableName) throws IOException {
this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf));
}
/**
* @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
*/
@Deprecated
public ClientScanner(final Configuration conf, final Scan scan,
final byte [] tableName) throws IOException {
this(conf, scan, TableName.valueOf(tableName));
}
/**
* @deprecated use
* {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)}
*/
@Deprecated
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection) throws IOException {
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf),
RpcControllerFactory.instantiate(conf));
}
/**
* @deprecated Use
* {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)}
*/
@Deprecated
public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
ClusterConnection connection) throws IOException {
this(conf, scan, TableName.valueOf(tableName), connection);
}
/**
* Create a new ClientScanner for the specified table.
* Note that the passed {@link Scan}'s start row maybe changed.
*
* @param conf
* @param scan
* @param tableName
* @param connection
* @param rpcFactory
* @throws IOException
*/
public ClientScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory)
throws IOException {
this(conf, scan, tableName, connection, rpcFactory, controllerFactory, null, 0);
}
/**
* Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
* row maybe changed changed.

View File

@ -42,28 +42,12 @@ import java.util.concurrent.ExecutorService;
* <p/>
* For small scan, it will get better performance than {@link ReversedClientScanner}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@InterfaceAudience.Private
public class ClientSmallReversedScanner extends ReversedClientScanner {
private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
private ScannerCallableWithReplicas smallScanCallable = null;
private byte[] skipRowOfFirstResult = null;
/**
* Create a new ReversibleClientScanner for the specified table Note that the
* passed {@link org.apache.hadoop.hbase.client.Scan}'s start row maybe changed.
*
* @param conf The {@link org.apache.hadoop.conf.Configuration} to use.
* @param scan {@link org.apache.hadoop.hbase.client.Scan} to use in this scanner
* @param tableName The table that we wish to scan
* @param connection Connection identifying the cluster
* @throws java.io.IOException
*/
public ClientSmallReversedScanner(Configuration conf, Scan scan, TableName tableName,
ClusterConnection connection) throws IOException {
super(conf, scan, tableName, connection);
}
/**
* Create a new ReversibleClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed changed.

View File

@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
@ -50,8 +49,7 @@ import com.google.protobuf.ServiceException;
*
* For small scan, it will get better performance than {@link ClientScanner}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@InterfaceAudience.Private
public class ClientSmallScanner extends ClientScanner {
private final Log LOG = LogFactory.getLog(this.getClass());
private ScannerCallableWithReplicas smallScanCallable = null;
@ -59,65 +57,6 @@ public class ClientSmallScanner extends ClientScanner {
// row with this one
private byte[] skipRowOfFirstResult = null;
/**
* Create a new ClientSmallScanner for the specified table. An HConnection
* will be retrieved using the passed Configuration. Note that the passed
* {@link Scan} 's start row maybe changed.
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to rangeGet
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName) throws IOException {
this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf));
}
/**
* Create a new ClientSmallScanner for the specified table. An HConnection
* will be retrieved using the passed Configuration. Note that the passed
* {@link Scan} 's start row maybe changed.
* @param conf
* @param scan
* @param tableName
* @param connection
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection) throws IOException {
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf),
new RpcControllerFactory(conf));
}
/**
* @deprecated use
* {@link #ClientSmallScanner(Configuration, Scan, TableName, HConnection,
* RpcRetryingCallerFactory, RpcControllerFactory)} instead
*/
@Deprecated
public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
}
/**
* Create a new ShortClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed changed.
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to rangeGet
* @param connection Connection identifying the cluster
* @param rpcFactory
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory) throws IOException {
this(conf, scan, tableName, connection, rpcFactory, controllerFactory, null, 0);
}
/**
* Create a new ShortClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed changed.
@ -127,13 +66,16 @@ public class ClientSmallScanner extends ClientScanner {
* @param tableName The table that we wish to rangeGet
* @param connection Connection identifying the cluster
* @param rpcFactory
* @param pool
* @param primaryOperationTimeout
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
ExecutorService pool, int primaryOperationTimeout) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory);
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
}
@Override
@ -220,6 +162,7 @@ public class ClientSmallScanner extends ClientScanner {
@Override
public Result[] call(int timeout) throws IOException {
if (this.closed) return null;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
@ -243,9 +186,6 @@ public class ClientSmallScanner extends ClientScanner {
return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), scanMetrics,
controllerFactory, getCaching(), id);
}
@Override
public void setClose(){}
}
@Override

View File

@ -593,6 +593,10 @@ class ConnectionManager {
private User user;
private RpcRetryingCallerFactory rpcCallerFactory;
private RpcControllerFactory rpcControllerFactory;
/**
* Cluster registry of basic info such as clusterid and meta region location.
*/
@ -623,6 +627,8 @@ class ConnectionManager {
retrieveClusterId();
this.rpcClient = new RpcClient(this.conf, this.clusterId);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
// Do we publish the status?
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
@ -1177,7 +1183,8 @@ class ConnectionManager {
Result regionInfoRow = null;
ReversedClientScanner rcs = null;
try {
rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this);
rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
rpcCallerFactory, rpcControllerFactory, getBatchPool(), 0);
regionInfoRow = rcs.next();
} finally {
if (rcs != null) {

View File

@ -135,7 +135,7 @@ public class HTable implements HTableInterface {
protected long currentWriteBufferSize;
protected int scannerCaching;
private int maxKeyValueSize;
private ExecutorService pool; // For Multi
private ExecutorService pool; // For Multi & Scan
private boolean closed;
private int operationTimeout;
private int retries;

View File

@ -140,15 +140,15 @@ public class MetaScanner {
// Calculate startrow for scan.
byte[] startRow;
ResultScanner scanner = null;
HTable metaTable = null;
try {
metaTable = new HTable(TableName.META_TABLE_NAME, connection, null);
if (row != null) {
// Scan starting at a particular row in a particular table
byte[] searchRow = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
HTable metaTable;
metaTable = new HTable(TableName.META_TABLE_NAME, connection, null);
Result startRowResult = metaTable.getRowOrBefore(searchRow, HConstants.CATALOG_FAMILY);
metaTable.close();
if (startRowResult == null) {
throw new TableNotFoundException("Cannot find row in "+ TableName
.META_TABLE_NAME.getNameAsString()+" for table: "
@ -182,9 +182,7 @@ public class MetaScanner {
Bytes.toStringBinary(startRow) + " for max=" + rowUpperLimit + " with caching=" + rows);
}
// Run the scan
scanner = (scan.isSmall() ?
new ClientSmallScanner(configuration, scan, TableName.META_TABLE_NAME, connection) :
new ClientScanner(configuration, scan, TableName.META_TABLE_NAME, connection));
scanner = metaTable.getScanner(scan);
Result result;
int processedRows = 0;
while ((result = scanner.next()) != null) {
@ -211,6 +209,15 @@ public class MetaScanner {
LOG.debug("Got exception in closing the meta scanner visitor", t);
}
}
if (metaTable != null) {
try {
metaTable.close();
} catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
LOG.debug("Got exception in closing meta table", t);
}
}
}
}

View File

@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@ -36,26 +35,12 @@ import org.apache.hadoop.hbase.util.ExceptionUtil;
/**
* A reversed client scanner which support backward scanning
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@InterfaceAudience.Private
public class ReversedClientScanner extends ClientScanner {
private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class);
// A byte array in which all elements are the max byte, and it is used to
// construct closest front row
static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
/**
* Create a new ReversibleClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed.
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to scan
* @param connection Connection identifying the cluster
* @throws IOException
*/
public ReversedClientScanner(Configuration conf, Scan scan,
TableName tableName, ClusterConnection connection) throws IOException {
super(conf, scan, tableName, connection);
}
/**
* Create a new ReversibleClientScanner for the specified table Note that the

View File

@ -132,9 +132,9 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
currentScannerCallable.getRow());
// allocate a boundedcompletion pool of some multiple of number of replicas.
// We want accommodate the "scan" RPC call and the "close" RPC call (we schedule "close"
// RPCs for unneeded replica scans using the same pool)
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5);
@ -151,7 +151,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
if (f != null) {
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs);
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); //great we got a response
}
@ -175,7 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
Future<Pair<Result[], ScannerCallable>> f = cs.take();
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs);
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); // great we got an answer
} catch (ExecutionException e) {
@ -204,7 +204,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
AtomicBoolean done, BoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
AtomicBoolean done, ExecutorService pool) {
if (done.compareAndSet(false, true)) {
if (currentScannerCallable != scanner) replicaSwitched.set(true);
currentScannerCallable = scanner;
@ -226,7 +226,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// the table is closed (when the awaitTermination of the underlying pool is called)
s.setClose();
RetryingRPC r = new RetryingRPC(s);
cs.submit(r);
pool.submit(r);
}
// now clear outstandingCallables since we scheduled a close for all the contained scanners
outstandingCallables.clear();

View File

@ -571,7 +571,7 @@ public class TestReplicasClient {
scanWithReplicas(reversed, small, Consistency.STRONG, caching, start, NUMROWS, false, false);
SlowMeCopro.sleepTime.set(0);
HTU.getHBaseAdmin().flush(table.getTableName());
flushRegion(hriPrimary);
LOG.info("flush done");
Thread.sleep(1000 + REFRESH_PERIOD * 2);