HBASE-11572 Add support for doing get/scans against a particular replica_id (Jeffrey Zhong)

This commit is contained in:
Enis Soztutar 2014-08-19 18:23:09 -07:00
parent f4f77b5756
commit 05e67eba9d
10 changed files with 115 additions and 55 deletions

View File

@ -77,7 +77,6 @@ public class Get extends Query
private boolean closestRowBefore = false;
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
private Consistency consistency = null;
/**
* Create a Get operation for the specified row.
@ -342,22 +341,6 @@ public class Get extends Query
return this.familyMap;
}
/**
* Returns the consistency level for this operation
* @return the consistency level
*/
public Consistency getConsistency() {
return consistency;
}
/**
* Sets the consistency level for this operation
* @param consistency the consistency level
*/
public void setConsistency(Consistency consistency) {
this.consistency = consistency;
}
/**
* Compile the table and column family (i.e. schema) information
* into a String. Useful for parsing and aggregation by debugging,

View File

@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.security.access.AccessControlConstants;
import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.visibility.Authorizations;
import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
@ -37,6 +35,8 @@ import com.google.common.collect.ListMultimap;
@InterfaceStability.Evolving
public abstract class Query extends OperationWithAttributes {
protected Filter filter = null;
protected int targetReplicaId = -1;
protected Consistency consistency = Consistency.STRONG;
/**
* @return Filter
@ -103,4 +103,39 @@ public abstract class Query extends OperationWithAttributes {
setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL,
ProtobufUtil.toUsersAndPermissions(permMap).toByteArray());
}
/**
* Returns the consistency level for this operation
* @return the consistency level
*/
public Consistency getConsistency() {
return consistency;
}
/**
* Sets the consistency level for this operation
* @param consistency the consistency level
*/
public void setConsistency(Consistency consistency) {
this.consistency = consistency;
}
/**
* Specify region replica id where Query will fetch data from. Use this together with
* {@link #setConsistency(Consistency)} passing {@link Consistency#TIMELINE} to read data from
* a specific replicaId.
* <br><b> Expert: </b>This is an advanced API exposed. Only use it if you know what you are doing
* @param Id
*/
public void setReplicaId(int Id) {
this.targetReplicaId = Id;
}
/**
* Returns region replica id where Query will fetch data from.
* @return region replica id or -1 if not set.
*/
public int getReplicaId() {
return this.targetReplicaId;
}
}

View File

@ -185,27 +185,34 @@ public class RpcRetryingCallerWithReadReplicas {
*/
public synchronized Result call()
throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID,
cConnection, tableName, get.getRow());
boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size());
addCallsForReplica(cs, rl, 0, 0);
try {
// wait for the timeout to see whether the primary responds back
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
return f.get(); //great we got a response
if(isTargetReplicaSpecified) {
addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
} else {
addCallsForReplica(cs, rl, 0, 0);
try {
// wait for the timeout to see whether the primary responds back
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
return f.get(); //great we got a response
}
} catch (ExecutionException e) {
throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
} catch (ExecutionException e) {
throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
throw new InterruptedIOException();
// submit call for the all of the secondaries at once
addCallsForReplica(cs, rl, 1, rl.size() - 1);
}
// submit call for the all of the secondaries at once
addCallsForReplica(cs, rl, 1, rl.size() - 1);
try {
try {
Future<Result> f = cs.take();

View File

@ -136,7 +136,6 @@ public class Scan extends Query {
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
private Boolean loadColumnFamiliesOnDemand = null;
private Consistency consistency = Consistency.STRONG;
/**
* Set it true for small scan to get better performance
@ -634,22 +633,6 @@ public class Scan extends Query {
&& this.loadColumnFamiliesOnDemand.booleanValue();
}
/**
* Returns the consistency level for this operation
* @return the consistency level
*/
public Consistency getConsistency() {
return consistency;
}
/**
* Sets the consistency level for this operation
* @param consistency the consistency level
*/
public void setConsistency(Consistency consistency) {
this.consistency = consistency;
}
/**
* Compile the table and column family (i.e. schema) information
* into a String. Useful for parsing and aggregation by debugging,

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TestMetaTableAccessor;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@ -215,6 +216,31 @@ public class TestRegionReplicas {
}
}
@Test(timeout = 60000)
public void testGetOnTargetRegionReplica() throws Exception {
try {
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
// assert that we can read back from primary
Assert.assertEquals(1000, HTU.countRows(table));
// flush so that region replica can read
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
openRegion(hriSecondary);
// try directly Get against region replica
byte[] row = Bytes.toBytes(String.valueOf(42));
Get get = new Get(row);
get.setConsistency(Consistency.TIMELINE);
get.setReplicaId(1);
Result result = table.get(get);
Assert.assertArrayEquals(row, result.getValue(f, null));
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
closeRegion(hriSecondary);
}
}
private void assertGet(HRegion region, int value, boolean expect) throws IOException {
byte[] row = Bytes.toBytes(String.valueOf(value));
Get get = new Get(row);

View File

@ -161,6 +161,10 @@ public class LoadTestTool extends AbstractHBaseTool {
protected static final String OPT_REGION_REPLICATION_USAGE =
"Desired number of replicas per region";
public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
protected static final String OPT_REGION_REPLICA_ID_USAGE =
"Region replica id to do the reads from";
protected static final long DEFAULT_START_KEY = 0;
/** This will be removed as we factor out the dependency on command line */
@ -202,7 +206,6 @@ public class LoadTestTool extends AbstractHBaseTool {
private int verifyPercent;
private int numTables = 1;
private int regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER;
private String superUser;
@ -212,6 +215,7 @@ public class LoadTestTool extends AbstractHBaseTool {
private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
private int regionReplication = -1; // not set
private int regionReplicaId = -1; // not set
// TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
// console tool itself should only be used from console.
@ -334,6 +338,7 @@ public class LoadTestTool extends AbstractHBaseTool {
addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
}
@Override
@ -351,7 +356,7 @@ public class LoadTestTool extends AbstractHBaseTool {
if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
"-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified");
"-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified");
}
if (isInitOnly && (isRead || isWrite || isUpdate)) {
@ -460,6 +465,11 @@ public class LoadTestTool extends AbstractHBaseTool {
if (cmd.hasOption(OPT_REGION_REPLICATION)) {
regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
}
regionReplicaId = -1;
if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
}
}
private void parseColumnFamilyOptions(CommandLine cmd) {
@ -621,6 +631,7 @@ public class LoadTestTool extends AbstractHBaseTool {
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
readerThreads.setMultiGetBatchSize(multiGetBatchSize);
readerThreads.setRegionReplicaId(regionReplicaId);
}
if (isUpdate && isWrite) {

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
@ -78,6 +79,7 @@ public class MultiThreadedReader extends MultiThreadedAction
private int maxErrors = DEFAULT_MAX_ERRORS;
private int keyWindow = DEFAULT_KEY_WINDOW;
private int batchSize = DEFAULT_BATCH_SIZE;
private int regionReplicaId = -1; // particular region replica id to do reads against if set
public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double verifyPercent) throws IOException {
@ -102,6 +104,10 @@ public class MultiThreadedReader extends MultiThreadedAction
this.batchSize = batchSize;
}
public void setRegionReplicaId(int regionReplicaId) {
this.regionReplicaId = regionReplicaId;
}
@Override
public void start(long startKey, long endKey, int numThreads) throws IOException {
super.start(startKey, endKey, numThreads);
@ -317,6 +323,10 @@ public class MultiThreadedReader extends MultiThreadedAction
}
}
get = dataGenerator.beforeGet(keyToRead, get);
if (regionReplicaId > 0) {
get.setReplicaId(regionReplicaId);
get.setConsistency(Consistency.TIMELINE);
}
if (verbose) {
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}
@ -334,7 +344,7 @@ public class MultiThreadedReader extends MultiThreadedAction
public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
// read the data
long start = System.nanoTime();
// Uses simple get
Result result = table.get(get);

View File

@ -58,6 +58,7 @@ module HBaseConstants
SPLITALGO = 'SPLITALGO'
NUMREGIONS = 'NUMREGIONS'
REGION_REPLICATION = 'REGION_REPLICATION'
REGION_REPLICA_ID = 'REGION_REPLICA_ID'
CONFIGURATION = org.apache.hadoop.hbase.HConstants::CONFIGURATION
ATTRIBUTES="ATTRIBUTES"
VISIBILITY="VISIBILITY"

View File

@ -288,6 +288,7 @@ EOF
attributes = args[ATTRIBUTES]
authorizations = args[AUTHORIZATIONS]
consistency = args.delete(CONSISTENCY) if args[CONSISTENCY]
replicaId = args.delete(REGION_REPLICA_ID) if args[REGION_REPLICA_ID]
unless args.empty?
columns = args[COLUMN] || args[COLUMNS]
if args[VERSIONS]
@ -346,6 +347,7 @@ EOF
end
get.setConsistency(org.apache.hadoop.hbase.client.Consistency.valueOf(consistency)) if consistency
get.setReplicaId(replicaId) if replicaId
# Call hbase for the results
result = @table.get(get)

View File

@ -40,6 +40,7 @@ a dictionary of column(s), timestamp, timerange and versions. Examples:
hbase> get 't1', 'r1', {COLUMN => 'c1', ATTRIBUTES => {'mykey'=>'myvalue'}}
hbase> get 't1', 'r1', {COLUMN => 'c1', AUTHORIZATIONS => ['PRIVATE','SECRET']}
hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE'}
hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1}
Besides the default 'toStringBinary' format, 'get' also supports custom formatting by
column. A user can define a FORMATTER by adding it to the column name in the get
@ -71,6 +72,7 @@ would be:
hbase> t.get 'r1', 'c1', 'c2'
hbase> t.get 'r1', ['c1', 'c2']
hbase> t.get 'r1', {CONSISTENCY => 'TIMELINE'}
hbase> t.get 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1}
EOF
end