From 05e67eba9da1488b5b20a7844504e35a8a5e92c5 Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Tue, 19 Aug 2014 18:23:09 -0700 Subject: [PATCH] HBASE-11572 Add support for doing get/scans against a particular replica_id (Jeffrey Zhong) --- .../org/apache/hadoop/hbase/client/Get.java | 17 -------- .../org/apache/hadoop/hbase/client/Query.java | 39 ++++++++++++++++++- .../RpcRetryingCallerWithReadReplicas.java | 39 +++++++++++-------- .../org/apache/hadoop/hbase/client/Scan.java | 17 -------- .../regionserver/TestRegionReplicas.java | 26 +++++++++++++ .../hadoop/hbase/util/LoadTestTool.java | 15 ++++++- .../hbase/util/MultiThreadedReader.java | 12 +++++- hbase-shell/src/main/ruby/hbase.rb | 1 + hbase-shell/src/main/ruby/hbase/table.rb | 2 + .../src/main/ruby/shell/commands/get.rb | 2 + 10 files changed, 115 insertions(+), 55 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index faccf5e6462..2d1348f9944 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -77,7 +77,6 @@ public class Get extends Query private boolean closestRowBefore = false; private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); - private Consistency consistency = null; /** * Create a Get operation for the specified row. @@ -342,22 +341,6 @@ public class Get extends Query return this.familyMap; } - /** - * Returns the consistency level for this operation - * @return the consistency level - */ - public Consistency getConsistency() { - return consistency; - } - - /** - * Sets the consistency level for this operation - * @param consistency the consistency level - */ - public void setConsistency(Consistency consistency) { - this.consistency = consistency; - } - /** * Compile the table and column family (i.e. schema) information * into a String. Useful for parsing and aggregation by debugging, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java index fe141c91fff..bbfa94046d4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java @@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.security.access.AccessControlConstants; import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.visibility.Authorizations; import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; -import org.apache.hadoop.hbase.util.Bytes; - import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; @@ -37,6 +35,8 @@ import com.google.common.collect.ListMultimap; @InterfaceStability.Evolving public abstract class Query extends OperationWithAttributes { protected Filter filter = null; + protected int targetReplicaId = -1; + protected Consistency consistency = Consistency.STRONG; /** * @return Filter @@ -103,4 +103,39 @@ public abstract class Query extends OperationWithAttributes { setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL, ProtobufUtil.toUsersAndPermissions(permMap).toByteArray()); } + + /** + * Returns the consistency level for this operation + * @return the consistency level + */ + public Consistency getConsistency() { + return consistency; + } + + /** + * Sets the consistency level for this operation + * @param consistency the consistency level + */ + public void setConsistency(Consistency consistency) { + this.consistency = consistency; + } + + /** + * Specify region replica id where Query will fetch data from. Use this together with + * {@link #setConsistency(Consistency)} passing {@link Consistency#TIMELINE} to read data from + * a specific replicaId. + *
Expert: This is an advanced API exposed. Only use it if you know what you are doing + * @param Id + */ + public void setReplicaId(int Id) { + this.targetReplicaId = Id; + } + + /** + * Returns region replica id where Query will fetch data from. + * @return region replica id or -1 if not set. + */ + public int getReplicaId() { + return this.targetReplicaId; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 1c733b62fc6..6cd422f8211 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -185,27 +185,34 @@ public class RpcRetryingCallerWithReadReplicas { */ public synchronized Result call() throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { - RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, - cConnection, tableName, get.getRow()); + boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); + + RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId() + : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size()); - addCallsForReplica(cs, rl, 0, 0); - try { - // wait for the timeout to see whether the primary responds back - Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f != null) { - return f.get(); //great we got a response + if(isTargetReplicaSpecified) { + addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); + } else { + addCallsForReplica(cs, rl, 0, 0); + try { + // wait for the timeout to see whether the primary responds back + Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds + if (f != null) { + return f.get(); //great we got a response + } + } catch (ExecutionException e) { + throwEnrichedException(e, retries); + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); } - } catch (ExecutionException e) { - throwEnrichedException(e, retries); - } catch (CancellationException e) { - throw new InterruptedIOException(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); + + // submit call for the all of the secondaries at once + addCallsForReplica(cs, rl, 1, rl.size() - 1); } - // submit call for the all of the secondaries at once - addCallsForReplica(cs, rl, 1, rl.size() - 1); try { try { Future f = cs.take(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 85681bf4b50..74bf37f8f76 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -136,7 +136,6 @@ public class Scan extends Query { private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); private Boolean loadColumnFamiliesOnDemand = null; - private Consistency consistency = Consistency.STRONG; /** * Set it true for small scan to get better performance @@ -634,22 +633,6 @@ public class Scan extends Query { && this.loadColumnFamiliesOnDemand.booleanValue(); } - /** - * Returns the consistency level for this operation - * @return the consistency level - */ - public Consistency getConsistency() { - return consistency; - } - - /** - * Sets the consistency level for this operation - * @param consistency the consistency level - */ - public void setConsistency(Consistency consistency) { - this.consistency = consistency; - } - /** * Compile the table and column family (i.e. schema) information * into a String. Useful for parsing and aggregation by debugging, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index 9f64a7c9cd6..50542be7032 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TestMetaTableAccessor; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -215,6 +216,31 @@ public class TestRegionReplicas { } } + @Test(timeout = 60000) + public void testGetOnTargetRegionReplica() throws Exception { + try { + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + // assert that we can read back from primary + Assert.assertEquals(1000, HTU.countRows(table)); + // flush so that region replica can read + getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); + + openRegion(hriSecondary); + + // try directly Get against region replica + byte[] row = Bytes.toBytes(String.valueOf(42)); + Get get = new Get(row); + get.setConsistency(Consistency.TIMELINE); + get.setReplicaId(1); + Result result = table.get(get); + Assert.assertArrayEquals(row, result.getValue(f, null)); + } finally { + HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); + closeRegion(hriSecondary); + } + } + private void assertGet(HRegion region, int value, boolean expect) throws IOException { byte[] row = Bytes.toBytes(String.valueOf(value)); Get get = new Get(row); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index ce3da340129..9e7186a597b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -161,6 +161,10 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_REGION_REPLICATION_USAGE = "Desired number of replicas per region"; + public static final String OPT_REGION_REPLICA_ID = "region_replica_id"; + protected static final String OPT_REGION_REPLICA_ID_USAGE = + "Region replica id to do the reads from"; + protected static final long DEFAULT_START_KEY = 0; /** This will be removed as we factor out the dependency on command line */ @@ -202,7 +206,6 @@ public class LoadTestTool extends AbstractHBaseTool { private int verifyPercent; private int numTables = 1; - private int regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER; private String superUser; @@ -212,6 +215,7 @@ public class LoadTestTool extends AbstractHBaseTool { private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; private int regionReplication = -1; // not set + private int regionReplicaId = -1; // not set // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, // console tool itself should only be used from console. @@ -334,6 +338,7 @@ public class LoadTestTool extends AbstractHBaseTool { addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE); addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE); addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE); + addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE); } @Override @@ -351,7 +356,7 @@ public class LoadTestTool extends AbstractHBaseTool { if (!isWrite && !isRead && !isUpdate && !isInitOnly) { throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " + - "-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified"); + "-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified"); } if (isInitOnly && (isRead || isWrite || isUpdate)) { @@ -460,6 +465,11 @@ public class LoadTestTool extends AbstractHBaseTool { if (cmd.hasOption(OPT_REGION_REPLICATION)) { regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION)); } + + regionReplicaId = -1; + if (cmd.hasOption(OPT_REGION_REPLICA_ID)) { + regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID)); + } } private void parseColumnFamilyOptions(CommandLine cmd) { @@ -621,6 +631,7 @@ public class LoadTestTool extends AbstractHBaseTool { readerThreads.setMaxErrors(maxReadErrors); readerThreads.setKeyWindow(keyWindow); readerThreads.setMultiGetBatchSize(multiGetBatchSize); + readerThreads.setRegionReplicaId(regionReplicaId); } if (isUpdate && isWrite) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index b749e62b112..cce71307e6f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; @@ -78,6 +79,7 @@ public class MultiThreadedReader extends MultiThreadedAction private int maxErrors = DEFAULT_MAX_ERRORS; private int keyWindow = DEFAULT_KEY_WINDOW; private int batchSize = DEFAULT_BATCH_SIZE; + private int regionReplicaId = -1; // particular region replica id to do reads against if set public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, double verifyPercent) throws IOException { @@ -102,6 +104,10 @@ public class MultiThreadedReader extends MultiThreadedAction this.batchSize = batchSize; } + public void setRegionReplicaId(int regionReplicaId) { + this.regionReplicaId = regionReplicaId; + } + @Override public void start(long startKey, long endKey, int numThreads) throws IOException { super.start(startKey, endKey, numThreads); @@ -317,6 +323,10 @@ public class MultiThreadedReader extends MultiThreadedAction } } get = dataGenerator.beforeGet(keyToRead, get); + if (regionReplicaId > 0) { + get.setReplicaId(regionReplicaId); + get.setConsistency(Consistency.TIMELINE); + } if (verbose) { LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); } @@ -334,7 +344,7 @@ public class MultiThreadedReader extends MultiThreadedAction public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { // read the data - + long start = System.nanoTime(); // Uses simple get Result result = table.get(get); diff --git a/hbase-shell/src/main/ruby/hbase.rb b/hbase-shell/src/main/ruby/hbase.rb index bbff1346edd..ca26137e474 100644 --- a/hbase-shell/src/main/ruby/hbase.rb +++ b/hbase-shell/src/main/ruby/hbase.rb @@ -58,6 +58,7 @@ module HBaseConstants SPLITALGO = 'SPLITALGO' NUMREGIONS = 'NUMREGIONS' REGION_REPLICATION = 'REGION_REPLICATION' + REGION_REPLICA_ID = 'REGION_REPLICA_ID' CONFIGURATION = org.apache.hadoop.hbase.HConstants::CONFIGURATION ATTRIBUTES="ATTRIBUTES" VISIBILITY="VISIBILITY" diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb index 14fe1e4e41b..2a350c138c0 100644 --- a/hbase-shell/src/main/ruby/hbase/table.rb +++ b/hbase-shell/src/main/ruby/hbase/table.rb @@ -288,6 +288,7 @@ EOF attributes = args[ATTRIBUTES] authorizations = args[AUTHORIZATIONS] consistency = args.delete(CONSISTENCY) if args[CONSISTENCY] + replicaId = args.delete(REGION_REPLICA_ID) if args[REGION_REPLICA_ID] unless args.empty? columns = args[COLUMN] || args[COLUMNS] if args[VERSIONS] @@ -346,6 +347,7 @@ EOF end get.setConsistency(org.apache.hadoop.hbase.client.Consistency.valueOf(consistency)) if consistency + get.setReplicaId(replicaId) if replicaId # Call hbase for the results result = @table.get(get) diff --git a/hbase-shell/src/main/ruby/shell/commands/get.rb b/hbase-shell/src/main/ruby/shell/commands/get.rb index 0035310ea01..1ab13cb9cac 100644 --- a/hbase-shell/src/main/ruby/shell/commands/get.rb +++ b/hbase-shell/src/main/ruby/shell/commands/get.rb @@ -40,6 +40,7 @@ a dictionary of column(s), timestamp, timerange and versions. Examples: hbase> get 't1', 'r1', {COLUMN => 'c1', ATTRIBUTES => {'mykey'=>'myvalue'}} hbase> get 't1', 'r1', {COLUMN => 'c1', AUTHORIZATIONS => ['PRIVATE','SECRET']} hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE'} + hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1} Besides the default 'toStringBinary' format, 'get' also supports custom formatting by column. A user can define a FORMATTER by adding it to the column name in the get @@ -71,6 +72,7 @@ would be: hbase> t.get 'r1', 'c1', 'c2' hbase> t.get 'r1', ['c1', 'c2'] hbase> t.get 'r1', {CONSISTENCY => 'TIMELINE'} + hbase> t.get 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1} EOF end