From 1daf8acfc9f48551a4d427bf71fb32f56371391b Mon Sep 17 00:00:00 2001 From: Devaraj Das Date: Thu, 22 May 2014 10:01:50 -0700 Subject: [PATCH] HBASE-11214. Fixes for scans on a replicated table --- .../hadoop/hbase/client/ClientScanner.java | 68 +----------------- .../client/ClientSmallReversedScanner.java | 18 +---- .../hbase/client/ClientSmallScanner.java | 72 ++----------------- .../hbase/client/ConnectionManager.java | 9 ++- .../apache/hadoop/hbase/client/HTable.java | 2 +- .../hadoop/hbase/client/MetaScanner.java | 21 ++++-- .../hbase/client/ReversedClientScanner.java | 17 +---- .../client/ScannerCallableWithReplicas.java | 12 ++-- .../hbase/client/TestReplicasClient.java | 2 +- 9 files changed, 39 insertions(+), 182 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index ef1b29fa92d..d13f8811e91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -49,8 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes; * If there are multiple regions in a table, this scanner will iterate * through them all. */ -@InterfaceAudience.Public -@InterfaceStability.Stable +@InterfaceAudience.Private public class ClientScanner extends AbstractClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); protected Scan scan; @@ -81,70 +79,6 @@ public class ClientScanner extends AbstractClientScanner { private int retries; protected final ExecutorService pool; - /** - * Create a new ClientScanner for the specified table. A ClusterConnection will be - * retrieved using the passed Configuration. - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, - final TableName tableName) throws IOException { - this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf)); - } - - /** - * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)} - */ - @Deprecated - public ClientScanner(final Configuration conf, final Scan scan, - final byte [] tableName) throws IOException { - this(conf, scan, TableName.valueOf(tableName)); - } - - - /** - * @deprecated use - * {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)} - */ - @Deprecated - public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), - RpcControllerFactory.instantiate(conf)); - } - - /** - * @deprecated Use - * {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)} - */ - @Deprecated - public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName, - ClusterConnection connection) throws IOException { - this(conf, scan, TableName.valueOf(tableName), connection); - } - - /** - * Create a new ClientScanner for the specified table. - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf - * @param scan - * @param tableName - * @param connection - * @param rpcFactory - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory) - throws IOException { - this(conf, scan, tableName, connection, rpcFactory, controllerFactory, null, 0); - } - /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start * row maybe changed changed. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index 055d91b17b6..9c67107696f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -42,28 +42,12 @@ import java.util.concurrent.ExecutorService; *

* For small scan, it will get better performance than {@link ReversedClientScanner} */ -@InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceAudience.Private public class ClientSmallReversedScanner extends ReversedClientScanner { private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); private ScannerCallableWithReplicas smallScanCallable = null; private byte[] skipRowOfFirstResult = null; - /** - * Create a new ReversibleClientScanner for the specified table Note that the - * passed {@link org.apache.hadoop.hbase.client.Scan}'s start row maybe changed. - * - * @param conf The {@link org.apache.hadoop.conf.Configuration} to use. - * @param scan {@link org.apache.hadoop.hbase.client.Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @param connection Connection identifying the cluster - * @throws java.io.IOException - */ - public ClientSmallReversedScanner(Configuration conf, Scan scan, TableName tableName, - ClusterConnection connection) throws IOException { - super(conf, scan, tableName, connection); - } - /** * Create a new ReversibleClientScanner for the specified table Note that the * passed {@link Scan}'s start row maybe changed changed. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 91cf641cc18..47a0485aad3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; @@ -50,8 +49,7 @@ import com.google.protobuf.ServiceException; * * For small scan, it will get better performance than {@link ClientScanner} */ -@InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceAudience.Private public class ClientSmallScanner extends ClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); private ScannerCallableWithReplicas smallScanCallable = null; @@ -59,65 +57,6 @@ public class ClientSmallScanner extends ClientScanner { // row with this one private byte[] skipRowOfFirstResult = null; - /** - * Create a new ClientSmallScanner for the specified table. An HConnection - * will be retrieved using the passed Configuration. Note that the passed - * {@link Scan} 's start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName) throws IOException { - this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf)); - } - - /** - * Create a new ClientSmallScanner for the specified table. An HConnection - * will be retrieved using the passed Configuration. Note that the passed - * {@link Scan} 's start row maybe changed. - * @param conf - * @param scan - * @param tableName - * @param connection - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), - new RpcControllerFactory(conf)); - } - - /** - * @deprecated use - * {@link #ClientSmallScanner(Configuration, Scan, TableName, HConnection, - * RpcRetryingCallerFactory, RpcControllerFactory)} instead - */ - @Deprecated - public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException { - this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf)); - } - - /** - * Create a new ShortClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @param connection Connection identifying the cluster - * @param rpcFactory - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory) throws IOException { - this(conf, scan, tableName, connection, rpcFactory, controllerFactory, null, 0); - } - /** * Create a new ShortClientScanner for the specified table Note that the * passed {@link Scan}'s start row maybe changed changed. @@ -127,13 +66,16 @@ public class ClientSmallScanner extends ClientScanner { * @param tableName The table that we wish to rangeGet * @param connection Connection identifying the cluster * @param rpcFactory + * @param pool + * @param primaryOperationTimeout * @throws IOException */ public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory); + super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, + primaryOperationTimeout); } @Override @@ -220,6 +162,7 @@ public class ClientSmallScanner extends ClientScanner { @Override public Result[] call(int timeout) throws IOException { + if (this.closed) return null; if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -243,9 +186,6 @@ public class ClientSmallScanner extends ClientScanner { return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), scanMetrics, controllerFactory, getCaching(), id); } - - @Override - public void setClose(){} } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index a5a28c2079d..92b99b49c3f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -593,6 +593,10 @@ class ConnectionManager { private User user; + private RpcRetryingCallerFactory rpcCallerFactory; + + private RpcControllerFactory rpcControllerFactory; + /** * Cluster registry of basic info such as clusterid and meta region location. */ @@ -623,6 +627,8 @@ class ConnectionManager { retrieveClusterId(); this.rpcClient = new RpcClient(this.conf, this.clusterId); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); // Do we publish the status? boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, @@ -1177,7 +1183,8 @@ class ConnectionManager { Result regionInfoRow = null; ReversedClientScanner rcs = null; try { - rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this); + rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this, + rpcCallerFactory, rpcControllerFactory, getBatchPool(), 0); regionInfoRow = rcs.next(); } finally { if (rcs != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index ef75b831726..1265a5fb2a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -135,7 +135,7 @@ public class HTable implements HTableInterface { protected long currentWriteBufferSize; protected int scannerCaching; private int maxKeyValueSize; - private ExecutorService pool; // For Multi + private ExecutorService pool; // For Multi & Scan private boolean closed; private int operationTimeout; private int retries; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java index 1442cbf8e1f..08f4ee2e047 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java @@ -140,15 +140,15 @@ public class MetaScanner { // Calculate startrow for scan. byte[] startRow; ResultScanner scanner = null; - + HTable metaTable = null; try { + metaTable = new HTable(TableName.META_TABLE_NAME, connection, null); if (row != null) { // Scan starting at a particular row in a particular table byte[] searchRow = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false); - HTable metaTable; - metaTable = new HTable(TableName.META_TABLE_NAME, connection, null); + Result startRowResult = metaTable.getRowOrBefore(searchRow, HConstants.CATALOG_FAMILY); - metaTable.close(); + if (startRowResult == null) { throw new TableNotFoundException("Cannot find row in "+ TableName .META_TABLE_NAME.getNameAsString()+" for table: " @@ -182,9 +182,7 @@ public class MetaScanner { Bytes.toStringBinary(startRow) + " for max=" + rowUpperLimit + " with caching=" + rows); } // Run the scan - scanner = (scan.isSmall() ? - new ClientSmallScanner(configuration, scan, TableName.META_TABLE_NAME, connection) : - new ClientScanner(configuration, scan, TableName.META_TABLE_NAME, connection)); + scanner = metaTable.getScanner(scan); Result result; int processedRows = 0; while ((result = scanner.next()) != null) { @@ -211,6 +209,15 @@ public class MetaScanner { LOG.debug("Got exception in closing the meta scanner visitor", t); } } + if (metaTable != null) { + try { + metaTable.close(); + } catch (Throwable t) { + ExceptionUtil.rethrowIfInterrupt(t); + LOG.debug("Got exception in closing meta table", t); + } + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 9b518b454e3..8a101ad59d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -36,26 +35,12 @@ import org.apache.hadoop.hbase.util.ExceptionUtil; /** * A reversed client scanner which support backward scanning */ -@InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceAudience.Private public class ReversedClientScanner extends ClientScanner { private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); // A byte array in which all elements are the max byte, and it is used to // construct closest front row static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); - /** - * Create a new ReversibleClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed. - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @param connection Connection identifying the cluster - * @throws IOException - */ - public ReversedClientScanner(Configuration conf, Scan scan, - TableName tableName, ClusterConnection connection) throws IOException { - super(conf, scan, tableName, connection); - } /** * Create a new ReversibleClientScanner for the specified table Note that the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 31e11b1becf..4c99e01e2e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -132,9 +132,9 @@ class ScannerCallableWithReplicas implements RetryingCallable { RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, currentScannerCallable.getRow()); + // allocate a boundedcompletion pool of some multiple of number of replicas. - // We want accommodate the "scan" RPC call and the "close" RPC call (we schedule "close" - // RPCs for unneeded replica scans using the same pool) + // We want to accomodate some RPCs for redundant replica scans (but are still in progress) BoundedCompletionService> cs = new BoundedCompletionService>(pool, rl.size() * 5); @@ -151,7 +151,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { if (f != null) { Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); //great we got a response } @@ -175,7 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { Future> f = cs.take(); Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); // great we got an answer } catch (ExecutionException e) { @@ -204,7 +204,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { } private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, - AtomicBoolean done, BoundedCompletionService> cs) { + AtomicBoolean done, ExecutorService pool) { if (done.compareAndSet(false, true)) { if (currentScannerCallable != scanner) replicaSwitched.set(true); currentScannerCallable = scanner; @@ -226,7 +226,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { // the table is closed (when the awaitTermination of the underlying pool is called) s.setClose(); RetryingRPC r = new RetryingRPC(s); - cs.submit(r); + pool.submit(r); } // now clear outstandingCallables since we scheduled a close for all the contained scanners outstandingCallables.clear(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 1e69afafafc..1f1f3d4cccc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -571,7 +571,7 @@ public class TestReplicasClient { scanWithReplicas(reversed, small, Consistency.STRONG, caching, start, NUMROWS, false, false); SlowMeCopro.sleepTime.set(0); - HTU.getHBaseAdmin().flush(table.getTableName()); + flushRegion(hriPrimary); LOG.info("flush done"); Thread.sleep(1000 + REFRESH_PERIOD * 2);