HBASE-18004 getRegionLocations needs to be called once in

ScannerCallableWithReplicas#call() (Huaxiang Sun)
This commit is contained in:
Michael Stack 2017-06-15 13:41:01 -07:00
parent d023508b5b
commit 4184ae7563
2 changed files with 29 additions and 24 deletions

View File

@ -72,6 +72,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private int scannerTimeout; private int scannerTimeout;
private Set<ScannerCallable> outstandingCallables = new HashSet<>(); private Set<ScannerCallable> outstandingCallables = new HashSet<>();
private boolean someRPCcancelled = false; //required for testing purposes only private boolean someRPCcancelled = false; //required for testing purposes only
private int regionReplication = 0;
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
@ -143,36 +144,42 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
//2. We should close the "losing" scanners (scanners other than the ones we hear back //2. We should close the "losing" scanners (scanners other than the ones we hear back
// from first) // from first)
// //
RegionLocations rl = null; // Since RegionReplication is a table attribute, it wont change as long as table is enabled,
try { // it just needs to be set once.
rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, if (regionReplication <= 0) {
currentScannerCallable.getRow()); RegionLocations rl = null;
} catch (RetriesExhaustedException | DoNotRetryIOException e) { try {
// We cannot get the primary replica region location, it is possible that the region server rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
// hosting meta table is down, it needs to proceed to try cached replicas directly. RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
if (cConnection instanceof ConnectionImplementation) { currentScannerCallable.getRow());
rl = ((ConnectionImplementation) cConnection) } catch (RetriesExhaustedException | DoNotRetryIOException e) {
.getCachedLocation(tableName, currentScannerCallable.getRow()); // We cannot get the primary replica region location, it is possible that the region server
if (rl == null) { // hosting meta table is down, it needs to proceed to try cached replicas directly.
if (cConnection instanceof ConnectionImplementation) {
rl = ((ConnectionImplementation) cConnection)
.getCachedLocation(tableName, currentScannerCallable.getRow());
if (rl == null) {
throw e;
}
} else {
// For completeness
throw e; throw e;
} }
} else {
// For completeness
throw e;
} }
regionReplication = rl.size();
} }
// allocate a boundedcompletion pool of some multiple of number of replicas. // allocate a boundedcompletion pool of some multiple of number of replicas.
// We want to accomodate some RPCs for redundant replica scans (but are still in progress) // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<>( new ResultBoundedCompletionService<>(
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
rl.size() * 5); regionReplication * 5);
AtomicBoolean done = new AtomicBoolean(false); AtomicBoolean done = new AtomicBoolean(false);
replicaSwitched.set(false); replicaSwitched.set(false);
// submit call for the primary replica. // submit call for the primary replica.
addCallsForCurrentReplica(cs, rl); addCallsForCurrentReplica(cs);
int startIndex = 0; int startIndex = 0;
try { try {
@ -195,7 +202,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// If rl's size is 1 or scan's consitency is strong, it needs to throw // If rl's size is 1 or scan's consitency is strong, it needs to throw
// out the exception from the primary replica // out the exception from the primary replica
if ((rl.size() == 1) || (scan.getConsistency() == Consistency.STRONG)) { if ((regionReplication == 1) || (scan.getConsistency() == Consistency.STRONG)) {
// Rethrow the first exception // Rethrow the first exception
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
} }
@ -208,13 +215,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
} }
// submit call for the all of the secondaries at once // submit call for the all of the secondaries at once
int endIndex = rl.size(); int endIndex = regionReplication;
if (scan.getConsistency() == Consistency.STRONG) { if (scan.getConsistency() == Consistency.STRONG) {
// When scan's consistency is strong, do not send to the secondaries // When scan's consistency is strong, do not send to the secondaries
endIndex = 1; endIndex = 1;
} else { } else {
// TODO: this may be an overkill for large region replication // TODO: this may be an overkill for large region replication
addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); addCallsForOtherReplicas(cs, 0, regionReplication - 1);
} }
try { try {
@ -307,15 +314,14 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
} }
private void addCallsForCurrentReplica( private void addCallsForCurrentReplica(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) { ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable); outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
} }
private void addCallsForOtherReplicas( private void addCallsForOtherReplicas(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) {
int min, int max) {
for (int id = min; id <= max; id++) { for (int id = min; id <= max; id++) {
if (currentScannerCallable.id == id) { if (currentScannerCallable.id == id) {

View File

@ -27,7 +27,6 @@ import java.lang.reflect.Constructor;
import java.net.BindException; import java.net.BindException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;