HBASE-11572 Add support for doing get/scans against a particular replica_id (Jeffrey Zhong)
This commit is contained in:
parent
ac2e1c33fd
commit
aeecd20373
|
@ -77,7 +77,6 @@ public class Get extends Query
|
|||
private boolean closestRowBefore = false;
|
||||
private Map<byte [], NavigableSet<byte []>> familyMap =
|
||||
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
|
||||
private Consistency consistency = null;
|
||||
|
||||
/**
|
||||
* Create a Get operation for the specified row.
|
||||
|
@ -342,22 +341,6 @@ public class Get extends Query
|
|||
return this.familyMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the consistency level for this operation
|
||||
* @return the consistency level
|
||||
*/
|
||||
public Consistency getConsistency() {
|
||||
return consistency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the consistency level for this operation
|
||||
* @param consistency the consistency level
|
||||
*/
|
||||
public void setConsistency(Consistency consistency) {
|
||||
this.consistency = consistency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compile the table and column family (i.e. schema) information
|
||||
* into a String. Useful for parsing and aggregation by debugging,
|
||||
|
|
|
@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.security.access.AccessControlConstants;
|
|||
import org.apache.hadoop.hbase.security.access.Permission;
|
||||
import org.apache.hadoop.hbase.security.visibility.Authorizations;
|
||||
import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
|
||||
|
@ -37,6 +35,8 @@ import com.google.common.collect.ListMultimap;
|
|||
@InterfaceStability.Evolving
|
||||
public abstract class Query extends OperationWithAttributes {
|
||||
protected Filter filter = null;
|
||||
protected int targetReplicaId = -1;
|
||||
protected Consistency consistency = Consistency.STRONG;
|
||||
|
||||
/**
|
||||
* @return Filter
|
||||
|
@ -103,4 +103,39 @@ public abstract class Query extends OperationWithAttributes {
|
|||
setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL,
|
||||
ProtobufUtil.toUsersAndPermissions(permMap).toByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the consistency level for this operation
|
||||
* @return the consistency level
|
||||
*/
|
||||
public Consistency getConsistency() {
|
||||
return consistency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the consistency level for this operation
|
||||
* @param consistency the consistency level
|
||||
*/
|
||||
public void setConsistency(Consistency consistency) {
|
||||
this.consistency = consistency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify region replica id where Query will fetch data from. Use this together with
|
||||
* {@link #setConsistency(Consistency)} passing {@link Consistency#TIMELINE} to read data from
|
||||
* a specific replicaId.
|
||||
* <br><b> Expert: </b>This is an advanced API exposed. Only use it if you know what you are doing
|
||||
* @param Id
|
||||
*/
|
||||
public void setReplicaId(int Id) {
|
||||
this.targetReplicaId = Id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns region replica id where Query will fetch data from.
|
||||
* @return region replica id or -1 if not set.
|
||||
*/
|
||||
public int getReplicaId() {
|
||||
return this.targetReplicaId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -185,27 +185,34 @@ public class RpcRetryingCallerWithReadReplicas {
|
|||
*/
|
||||
public synchronized Result call()
|
||||
throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
|
||||
RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID,
|
||||
cConnection, tableName, get.getRow());
|
||||
boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
|
||||
|
||||
RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
|
||||
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
|
||||
ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size());
|
||||
|
||||
addCallsForReplica(cs, rl, 0, 0);
|
||||
try {
|
||||
// wait for the timeout to see whether the primary responds back
|
||||
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
|
||||
if (f != null) {
|
||||
return f.get(); //great we got a response
|
||||
if(isTargetReplicaSpecified) {
|
||||
addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
|
||||
} else {
|
||||
addCallsForReplica(cs, rl, 0, 0);
|
||||
try {
|
||||
// wait for the timeout to see whether the primary responds back
|
||||
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
|
||||
if (f != null) {
|
||||
return f.get(); //great we got a response
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
throwEnrichedException(e, retries);
|
||||
} catch (CancellationException e) {
|
||||
throw new InterruptedIOException();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
throwEnrichedException(e, retries);
|
||||
} catch (CancellationException e) {
|
||||
throw new InterruptedIOException();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
|
||||
// submit call for the all of the secondaries at once
|
||||
addCallsForReplica(cs, rl, 1, rl.size() - 1);
|
||||
}
|
||||
|
||||
// submit call for the all of the secondaries at once
|
||||
addCallsForReplica(cs, rl, 1, rl.size() - 1);
|
||||
try {
|
||||
try {
|
||||
Future<Result> f = cs.take();
|
||||
|
|
|
@ -136,7 +136,6 @@ public class Scan extends Query {
|
|||
private Map<byte [], NavigableSet<byte []>> familyMap =
|
||||
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
|
||||
private Boolean loadColumnFamiliesOnDemand = null;
|
||||
private Consistency consistency = Consistency.STRONG;
|
||||
|
||||
/**
|
||||
* Set it true for small scan to get better performance
|
||||
|
@ -634,22 +633,6 @@ public class Scan extends Query {
|
|||
&& this.loadColumnFamiliesOnDemand.booleanValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the consistency level for this operation
|
||||
* @return the consistency level
|
||||
*/
|
||||
public Consistency getConsistency() {
|
||||
return consistency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the consistency level for this operation
|
||||
* @param consistency the consistency level
|
||||
*/
|
||||
public void setConsistency(Consistency consistency) {
|
||||
this.consistency = consistency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compile the table and column family (i.e. schema) information
|
||||
* into a String. Useful for parsing and aggregation by debugging,
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.MediumTests;
|
|||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TestMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.client.Consistency;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
|
@ -198,6 +199,31 @@ public class TestRegionReplicas {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testGetOnTargetRegionReplica() throws Exception {
|
||||
try {
|
||||
//load some data to primary
|
||||
HTU.loadNumericRows(table, f, 0, 1000);
|
||||
// assert that we can read back from primary
|
||||
Assert.assertEquals(1000, HTU.countRows(table));
|
||||
// flush so that region replica can read
|
||||
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
|
||||
|
||||
openRegion(hriSecondary);
|
||||
|
||||
// try directly Get against region replica
|
||||
byte[] row = Bytes.toBytes(String.valueOf(42));
|
||||
Get get = new Get(row);
|
||||
get.setConsistency(Consistency.TIMELINE);
|
||||
get.setReplicaId(1);
|
||||
Result result = table.get(get);
|
||||
Assert.assertArrayEquals(row, result.getValue(f, null));
|
||||
} finally {
|
||||
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
|
||||
closeRegion(hriSecondary);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertGet(HRegion region, int value, boolean expect) throws IOException {
|
||||
byte[] row = Bytes.toBytes(String.valueOf(value));
|
||||
Get get = new Get(row);
|
||||
|
|
|
@ -161,6 +161,10 @@ public class LoadTestTool extends AbstractHBaseTool {
|
|||
protected static final String OPT_REGION_REPLICATION_USAGE =
|
||||
"Desired number of replicas per region";
|
||||
|
||||
public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
|
||||
protected static final String OPT_REGION_REPLICA_ID_USAGE =
|
||||
"Region replica id to do the reads from";
|
||||
|
||||
protected static final long DEFAULT_START_KEY = 0;
|
||||
|
||||
/** This will be removed as we factor out the dependency on command line */
|
||||
|
@ -202,7 +206,6 @@ public class LoadTestTool extends AbstractHBaseTool {
|
|||
private int verifyPercent;
|
||||
|
||||
private int numTables = 1;
|
||||
private int regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER;
|
||||
|
||||
private String superUser;
|
||||
|
||||
|
@ -212,6 +215,7 @@ public class LoadTestTool extends AbstractHBaseTool {
|
|||
|
||||
private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
|
||||
private int regionReplication = -1; // not set
|
||||
private int regionReplicaId = -1; // not set
|
||||
|
||||
// TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
|
||||
// console tool itself should only be used from console.
|
||||
|
@ -334,6 +338,7 @@ public class LoadTestTool extends AbstractHBaseTool {
|
|||
addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
|
||||
addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
|
||||
addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
|
||||
addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -351,7 +356,7 @@ public class LoadTestTool extends AbstractHBaseTool {
|
|||
|
||||
if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
|
||||
throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
|
||||
"-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified");
|
||||
"-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified");
|
||||
}
|
||||
|
||||
if (isInitOnly && (isRead || isWrite || isUpdate)) {
|
||||
|
@ -460,6 +465,11 @@ public class LoadTestTool extends AbstractHBaseTool {
|
|||
if (cmd.hasOption(OPT_REGION_REPLICATION)) {
|
||||
regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
|
||||
}
|
||||
|
||||
regionReplicaId = -1;
|
||||
if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
|
||||
regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
|
||||
}
|
||||
}
|
||||
|
||||
private void parseColumnFamilyOptions(CommandLine cmd) {
|
||||
|
@ -621,6 +631,7 @@ public class LoadTestTool extends AbstractHBaseTool {
|
|||
readerThreads.setMaxErrors(maxReadErrors);
|
||||
readerThreads.setKeyWindow(keyWindow);
|
||||
readerThreads.setMultiGetBatchSize(multiGetBatchSize);
|
||||
readerThreads.setRegionReplicaId(regionReplicaId);
|
||||
}
|
||||
|
||||
if (isUpdate && isWrite) {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
||||
import org.apache.hadoop.hbase.client.Consistency;
|
||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
|
||||
|
@ -78,6 +79,7 @@ public class MultiThreadedReader extends MultiThreadedAction
|
|||
private int maxErrors = DEFAULT_MAX_ERRORS;
|
||||
private int keyWindow = DEFAULT_KEY_WINDOW;
|
||||
private int batchSize = DEFAULT_BATCH_SIZE;
|
||||
private int regionReplicaId = -1; // particular region replica id to do reads against if set
|
||||
|
||||
public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
|
||||
TableName tableName, double verifyPercent) throws IOException {
|
||||
|
@ -102,6 +104,10 @@ public class MultiThreadedReader extends MultiThreadedAction
|
|||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
public void setRegionReplicaId(int regionReplicaId) {
|
||||
this.regionReplicaId = regionReplicaId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(long startKey, long endKey, int numThreads) throws IOException {
|
||||
super.start(startKey, endKey, numThreads);
|
||||
|
@ -317,6 +323,10 @@ public class MultiThreadedReader extends MultiThreadedAction
|
|||
}
|
||||
}
|
||||
get = dataGenerator.beforeGet(keyToRead, get);
|
||||
if (regionReplicaId > 0) {
|
||||
get.setReplicaId(regionReplicaId);
|
||||
get.setConsistency(Consistency.TIMELINE);
|
||||
}
|
||||
if (verbose) {
|
||||
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
|
||||
}
|
||||
|
@ -334,7 +344,7 @@ public class MultiThreadedReader extends MultiThreadedAction
|
|||
|
||||
public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
|
||||
// read the data
|
||||
|
||||
|
||||
long start = System.nanoTime();
|
||||
// Uses simple get
|
||||
Result result = table.get(get);
|
||||
|
|
|
@ -58,6 +58,7 @@ module HBaseConstants
|
|||
SPLITALGO = 'SPLITALGO'
|
||||
NUMREGIONS = 'NUMREGIONS'
|
||||
REGION_REPLICATION = 'REGION_REPLICATION'
|
||||
REGION_REPLICA_ID = 'REGION_REPLICA_ID'
|
||||
CONFIGURATION = org.apache.hadoop.hbase.HConstants::CONFIGURATION
|
||||
ATTRIBUTES="ATTRIBUTES"
|
||||
VISIBILITY="VISIBILITY"
|
||||
|
|
|
@ -288,6 +288,7 @@ EOF
|
|||
attributes = args[ATTRIBUTES]
|
||||
authorizations = args[AUTHORIZATIONS]
|
||||
consistency = args.delete(CONSISTENCY) if args[CONSISTENCY]
|
||||
replicaId = args.delete(REGION_REPLICA_ID) if args[REGION_REPLICA_ID]
|
||||
unless args.empty?
|
||||
columns = args[COLUMN] || args[COLUMNS]
|
||||
if args[VERSIONS]
|
||||
|
@ -346,6 +347,7 @@ EOF
|
|||
end
|
||||
|
||||
get.setConsistency(org.apache.hadoop.hbase.client.Consistency.valueOf(consistency)) if consistency
|
||||
get.setReplicaId(replicaId) if replicaId
|
||||
|
||||
# Call hbase for the results
|
||||
result = @table.get(get)
|
||||
|
|
|
@ -40,6 +40,7 @@ a dictionary of column(s), timestamp, timerange and versions. Examples:
|
|||
hbase> get 't1', 'r1', {COLUMN => 'c1', ATTRIBUTES => {'mykey'=>'myvalue'}}
|
||||
hbase> get 't1', 'r1', {COLUMN => 'c1', AUTHORIZATIONS => ['PRIVATE','SECRET']}
|
||||
hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE'}
|
||||
hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1}
|
||||
|
||||
Besides the default 'toStringBinary' format, 'get' also supports custom formatting by
|
||||
column. A user can define a FORMATTER by adding it to the column name in the get
|
||||
|
@ -71,6 +72,7 @@ would be:
|
|||
hbase> t.get 'r1', 'c1', 'c2'
|
||||
hbase> t.get 'r1', ['c1', 'c2']
|
||||
hbase> t.get 'r1', {CONSISTENCY => 'TIMELINE'}
|
||||
hbase> t.get 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1}
|
||||
EOF
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue