HBASE-10356 Failover RPC's for multi-get

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1569559 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2014-02-18 23:37:17 +00:00 committed by Enis Soztutar
parent d6f603a492
commit 25b6103dad
13 changed files with 854 additions and 155 deletions

View File

@ -143,7 +143,7 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
public static final byte REPLICA_ID_DELIMITER = (byte)'_';
private static final int MAX_REPLICA_ID = 0xFFFF;
private static final int DEFAULT_REPLICA_ID = 0;
static final int DEFAULT_REPLICA_ID = 0;
/**
* Does region name contain its encoded name?
* @param regionName region name

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -261,6 +262,10 @@ public class RegionLocations {
return locations;
}
public HRegionLocation getDefaultRegionLocation() {
return locations[HRegionInfo.DEFAULT_REPLICA_ID];
}
/**
* Returns the first not-null region location in the list
*/

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
/**
* A Get, Put, Increment, Append, or Delete associated with it's region. Used internally by
@ -27,18 +28,34 @@ import org.apache.hadoop.hbase.HConstants;
* the index from the original request.
*/
@InterfaceAudience.Private
//TODO: R is never used
public class Action<R> implements Comparable<R> {
// TODO: This class should not be visible outside of the client package.
private Row action;
private int originalIndex;
private long nonce = HConstants.NO_NONCE;
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
public Action(Row action, int originalIndex) {
super();
this.action = action;
this.originalIndex = originalIndex;
this.originalIndex = originalIndex;
}
/**
* Creates an action for a particular replica from original action.
* @param action Original action.
* @param replicaId Replica id for the new action.
*/
public Action(Action<R> action, int replicaId) {
super();
this.action = action.action;
this.nonce = action.nonce;
this.originalIndex = action.originalIndex;
this.replicaId = replicaId;
}
public void setNonce(long nonce) {
this.nonce = nonce;
}
@ -55,6 +72,10 @@ public class Action<R> implements Comparable<R> {
return originalIndex;
}
public int getReplicaId() {
return replicaId;
}
@SuppressWarnings("rawtypes")
@Override
public int compareTo(Object o) {

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.htrace.Trace;
import com.google.common.annotations.VisibleForTesting;
/**
@ -89,9 +89,11 @@ import com.google.common.annotations.VisibleForTesting;
* </p>
*/
class AsyncProcess {
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
protected static final AtomicLong COUNTER = new AtomicLong();
public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout";
/**
* The context used to wait for results from one submit call.
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
@ -102,7 +104,7 @@ class AsyncProcess {
public boolean hasError();
public RetriesExhaustedWithDetailsException getErrors();
public List<? extends Row> getFailedOperations();
public Object[] getResults();
public Object[] getResults() throws InterruptedIOException;
/** Wait until all tasks are executed, successfully or not. */
public void waitUntilDone() throws InterruptedIOException;
}
@ -122,6 +124,27 @@ class AsyncProcess {
public void waitUntilDone() throws InterruptedIOException {}
};
/** Sync point for calls to multiple replicas for the same user request (Get).
* Created and put in the results array (we assume replica calls require results) when
* the replica calls are launched. See results for details of this process.
* POJO, all fields are public. To modify them, the object itself is locked. */
private static class ReplicaResultState {
public ReplicaResultState(int callCount) {
this.callCount = callCount;
}
/** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
int callCount;
/** Call that succeeds sets the count to 0 and sets this to result. Call that fails but
* is not last, adds error to list. If all calls fail the last one sets this to list. */
Object result = null;
/** Errors for which it is not decided whether we will report them to user. If one of the
* calls succeeds, we will discard the errors that may have happened in the other calls. */
BatchErrors replicaErrors = null;
}
// TODO: many of the fields should be made private
protected final long id;
protected final ClusterConnection hConnection;
@ -160,6 +183,7 @@ class AsyncProcess {
protected int numTries;
protected int serverTrackerTimeout;
protected int timeout;
protected long primaryCallTimeout;
// End configuration settings.
protected static class BatchErrors {
@ -192,6 +216,12 @@ class AsyncProcess {
actions.clear();
addresses.clear();
}
public synchronized void merge(BatchErrors other) {
throwables.addAll(other.throwables);
actions.addAll(other.actions);
addresses.addAll(other.addresses);
}
}
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
@ -212,6 +242,7 @@ class AsyncProcess {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.primaryCallTimeout = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
@ -270,7 +301,8 @@ class AsyncProcess {
/**
* Extract from the rows list what we can submit. The rows we can not submit are kept in the
* list.
* list. Does not send requests to replicas (not currently used for anything other
* than streaming puts anyway).
*
* @param pool ExecutorService to use.
* @param tableName The table for which this request is needed.
@ -311,7 +343,7 @@ class AsyncProcess {
Row r = it.next();
HRegionLocation loc;
try {
loc = findDestLocation(tableName, r);
loc = findDestLocation(tableName, r, true).getDefaultRegionLocation();
} catch (IOException ex) {
locationErrors = new ArrayList<Exception>();
locationErrorRows = new ArrayList<Integer>();
@ -329,7 +361,9 @@ class AsyncProcess {
Action<Row> action = new Action<Row>(r, ++posInList);
setNonce(ng, r, action);
retainedActions.add(action);
addAction(loc, action, actionsByServer, nonceGroup);
// TODO: replica-get is not supported on this path
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
it.remove();
}
}
@ -347,7 +381,7 @@ class AsyncProcess {
ars.manageError(originalIndex, row, false, locationErrors.get(i), null);
}
}
ars.sendMultiAction(actionsByServer, 1);
ars.sendMultiAction(actionsByServer, 1, null);
return ars;
}
@ -359,13 +393,12 @@ class AsyncProcess {
* @param actionsByServer the multiaction per server
* @param nonceGroup Nonce group.
*/
private void addAction(HRegionLocation loc, Action<Row> action,
private void addAction(ServerName server, byte[] regionName, Action<Row> action,
Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
final byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction<Row> multiAction = actionsByServer.get(loc.getServerName());
MultiAction<Row> multiAction = actionsByServer.get(server);
if (multiAction == null) {
multiAction = new MultiAction<Row>();
actionsByServer.put(loc.getServerName(), multiAction);
actionsByServer.put(server, multiAction);
}
if (action.hasNonce() && !multiAction.hasNonceGroup()) {
multiAction.setNonceGroup(nonceGroup);
@ -380,10 +413,12 @@ class AsyncProcess {
* @param row the row
* @return the destination.
*/
private HRegionLocation findDestLocation(TableName tableName, Row row) throws IOException {
private RegionLocations findDestLocation(
TableName tableName, Row row, boolean checkPrimary) throws IOException {
if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
HRegionLocation loc = hConnection.locateRegion(tableName, row.getRow());
if (loc == null) {
RegionLocations loc = hConnection.locateRegionAll(tableName, row.getRow());
if (loc == null
|| (checkPrimary && (loc.isEmpty() || loc.getDefaultRegionLocation() == null))) {
throw new IOException("#" + id + ", no location found, aborting submit for" +
" tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow()));
}
@ -516,6 +551,144 @@ class AsyncProcess {
* scheduling children. This is why lots of code doesn't require any synchronization.
*/
protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
/**
* Runnable (that can be submitted to thread pool) that waits for when it's time
* to issue replica calls, finds region replicas, groups the requests by replica and
* issues the calls (on separate threads, via sendMultiAction).
* This is done on a separate thread because we don't want to wait on user thread for
* our asynchronous call, and usually we have to wait before making replica calls.
*/
private final class ReplicaCallIssuingRunnable implements Runnable {
private final long startTime;
private final List<Action<Row>> initialActions;
public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
this.initialActions = initialActions;
this.startTime = startTime;
}
@Override
public void run() {
boolean done = false;
if (primaryCallTimeout > 0) {
try {
done = waitUntilDone(startTime + primaryCallTimeout);
} catch (InterruptedException ex) {
LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
return;
}
}
if (done) return; // Done within primary timeout
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
if (replicaGetIndices == null) {
for (int i = 0; i < results.length; ++i) {
addReplicaActions(i, actionsByServer);
}
} else {
for (int i = 0; i < replicaGetIndices.length; ++i) {
addReplicaActions(replicaGetIndices[i], actionsByServer);
}
}
if (actionsByServer.isEmpty()) return; // Nothing to do - done or no replicas found.
sendMultiAction(actionsByServer, 1, null);
}
/**
* Add replica actions to action map by server.
* @param index Index of the original action.
* @param actionsByServer The map by server to add it to.
*/
private void addReplicaActions(
int index, Map<ServerName, MultiAction<Row>> actionsByServer) {
if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
Action<Row> action = initialActions.get(index);
RegionLocations loc = null;
try {
// For perf, we assume that this location coming from cache, since we just got location
// from meta for the primary call. If it turns out to not be the case, we'd need local
// cache since we want to keep as little time as possible before replica call.
loc = findDestLocation(tableName, action.getAction(), false);
} catch (IOException ex) {
manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
LOG.error("Cannot get location - no replica calls for some actions", ex);
return;
}
HRegionLocation[] locs = loc.getRegionLocations();
int replicaCount = 0;
for (int i = 1; i < locs.length; ++i) {
replicaCount += (locs[i] != null) ? 1 : 0;
}
if (replicaCount == 0) {
LOG.warn("No replicas found for " + action.getAction());
return;
}
synchronized (replicaResultLock) {
// Don't run replica calls if the original has finished. We could do it e.g. if
// original has already failed before first replica call (unlikely given retries),
// but that would require additional synchronization w.r.t. returning to caller.
if (results[index] != null) return;
// We set the number of calls here. After that any path must call setResult/setError.
results[index] = new ReplicaResultState(replicaCount + 1);
}
for (int i = 1; i < locs.length; ++i) {
if (locs[i] == null) continue;
addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
new Action<Row>(action, i), actionsByServer, nonceGroup);
}
}
}
/**
* Runnable (that can be submitted to thread pool) that submits MultiAction to a
* single server. The server call is synchronous, therefore we do it on a thread pool.
*/
private final class SingleServerRequestRunnable implements Runnable {
private final MultiAction<Row> multiAction;
private final int numAttempt;
private final ServerName server;
private SingleServerRequestRunnable(
MultiAction<Row> multiAction, int numAttempt, ServerName server) {
this.multiAction = multiAction;
this.numAttempt = numAttempt;
this.server = server;
}
@Override
public void run() {
MultiResponse res;
try {
MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction);
try {
res = createCaller(callable).callWithoutRetries(callable, timeout);
} catch (IOException e) {
// The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server.
receiveGlobalFailure(multiAction, server, numAttempt, e);
return;
} catch (Throwable t) {
// This should not happen. Let's log & retry anyway.
LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
" Retrying. Server is " + server + ", tableName=" + tableName, t);
receiveGlobalFailure(multiAction, server, numAttempt, t);
return;
}
// Normal case: we received an answer from the server, and it's not an exception.
receiveMultiAction(multiAction, server, res, numAttempt);
} catch (Throwable t) {
// Something really bad happened. We are on the send thread that will now die.
LOG.error("Internal AsyncProcess #" + id + " error for "
+ tableName + " processing for " + server, t);
throw new RuntimeException(t);
} finally {
decTaskCounters(multiAction.getRegions(), server);
}
}
}
private final Batch.Callback<CResult> callback;
private final BatchErrors errors;
private final ConnectionManager.ServerErrorTracker errorsByServer;
@ -524,7 +697,21 @@ class AsyncProcess {
private final TableName tableName;
private final AtomicLong actionsInProgress = new AtomicLong(-1);
/** The lock controls access to results. It is only held when populating results where
* there might be several callers (eventual consistency gets). For other requests,
* there's one unique call going on per result index. */
private final Object replicaResultLock = new Object();
/** Result array. Null if results are not needed. Otherwise, each index corresponds to
* the action index in initial actions submitted. For most request types, has null-s for
* requests that are not done, and result/exception for those that are done.
* For eventual-consistency gets, initially the same applies; at some point, replica calls
* might be started, and ReplicaResultState is put at the corresponding indices. The
* returning calls check the type to detect when this is the case. After all calls are done,
* ReplicaResultState-s are replaced with results for the user. */
private final Object[] results;
/** Indices of replica gets in results. If null, all or no actions are replica-gets. */
private final int[] replicaGetIndices;
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
@ -545,6 +732,51 @@ class AsyncProcess {
} else {
this.results = needResults ? new Object[actions.size()] : null;
}
List<Integer> replicaGetIndices = null;
boolean hasAnyReplicaGets = false;
if (needResults) {
// Check to see if any requests might require replica calls.
// We expect that many requests will consist of all or no multi-replica gets; in such
// cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
// store the list of action indexes for which replica gets are possible, and set
// hasAnyReplicaGets to true.
boolean hasAnyNonReplicaReqs = false;
int posInList = 0;
for (Action<Row> action : actions) {
boolean isReplicaGet = isReplicaGet(action.getAction());
if (isReplicaGet) {
hasAnyReplicaGets = true;
if (hasAnyNonReplicaReqs) { // Mixed case
if (replicaGetIndices == null) {
replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
}
replicaGetIndices.add(posInList);
}
} else if (!hasAnyNonReplicaReqs) {
// The first non-multi-replica request in the action list.
hasAnyNonReplicaReqs = true;
if (posInList > 0) {
// Add all the previous requests to the index lists. We know they are all
// replica-gets because this is the first non-multi-replica request in the list.
replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
for (int i = 0; i < posInList; ++i) {
replicaGetIndices.add(i);
}
}
}
++posInList;
}
}
this.hasAnyReplicaGets = hasAnyReplicaGets;
if (replicaGetIndices != null) {
this.replicaGetIndices = new int[replicaGetIndices.size()];
int i = 0;
for (Integer el : replicaGetIndices) {
this.replicaGetIndices[i++] = el;
}
} else {
this.replicaGetIndices = null;
}
this.errorsByServer = createServerErrorTracker();
this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
}
@ -560,21 +792,40 @@ class AsyncProcess {
final Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
HRegionLocation loc;
boolean isReplica = false;
for (Action<Row> action : currentActions) {
RegionLocations locs = null;
try {
loc = findDestLocation(tableName, action.getAction());
locs = findDestLocation(tableName, action.getAction(), false);
} catch (IOException ex) {
// There are multiple retries in locateRegion already. No need to add new.
// We can't continue with this row, hence it's the last retry.
manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
continue;
}
addAction(loc, action, actionsByServer, nonceGroup);
}
boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
if (isReplica && !isReplicaAction) {
// This is the property of the current implementation, not a requirement.
throw new AssertionError("Replica and non-replica actions in the same retry");
}
isReplica = isReplicaAction;
HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
if (loc == null || loc.getServerName() == null) {
// On retry, we couldn't find location for some replica we saw before.
String str = "Cannot find location for replica " + action.getReplicaId();
LOG.error(str);
manageError(action.getOriginalIndex(), action.getAction(),
false, new IOException(str), null);
continue;
}
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
}
// If this is a first attempt to group and send, no replicas, we need replica thread.
if (!actionsByServer.isEmpty()) {
sendMultiAction(actionsByServer, numAttempt);
boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null);
}
}
@ -584,51 +835,22 @@ class AsyncProcess {
*
* @param actionsByServer the actions structured by regions
* @param numAttempt the attempt number.
* @param actionsForReplicaThread original actions for replica thread; null on non-first call.
*/
private void sendMultiAction(
Map<ServerName, MultiAction<Row>> actionsByServer, final int numAttempt) {
private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
int numAttempt, List<Action<Row>> actionsForReplicaThread) {
// Run the last item on the same thread if we are already on a send thread.
// We hope most of the time it will be the only item, so we can cut down on threads.
int reuseThreadCountdown = (numAttempt > 1) ? actionsByServer.size() : Integer.MAX_VALUE;
int actionsRemaining = actionsByServer.size();
// This iteration is by server (the HRegionLocation comparator is by server portion only).
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
final ServerName server = e.getKey();
final MultiAction<Row> multiAction = e.getValue();
incTaskCounters(multiAction.getRegions(), server);
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
@Override
public void run() {
MultiResponse res;
try {
MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction);
try {
res = createCaller(callable).callWithoutRetries(callable, timeout);
} catch (IOException e) {
// The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server.
receiveGlobalFailure(multiAction, server, numAttempt, e);
return;
} catch (Throwable t) {
// This should not happen. Let's log & retry anyway.
LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
" Retrying. Server is " + server.getServerName() + ", tableName=" + tableName, t);
receiveGlobalFailure(multiAction, server, numAttempt, t);
return;
}
// Normal case: we received an answer from the server, and it's not an exception.
receiveMultiAction(multiAction, server, res, numAttempt);
} catch (Throwable t) {
// Something really bad happened. We are on the send thread that will now die.
LOG.error("Internal AsyncProcess #" + id + " error for "
+ tableName + " processing for " + server, t);
throw new RuntimeException(t);
} finally {
decTaskCounters(multiAction.getRegions(), server);
}
}
});
--reuseThreadCountdown;
if (reuseThreadCountdown == 0) {
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
new SingleServerRequestRunnable(multiAction, numAttempt, server));
--actionsRemaining;
if ((numAttempt > 1) && actionsRemaining == 0) {
runnable.run();
} else {
try {
@ -645,6 +867,30 @@ class AsyncProcess {
}
}
}
if (actionsForReplicaThread != null) {
startWaitingForReplicaCalls(actionsForReplicaThread);
}
}
/**
* Starts waiting to issue replica calls on a different thread; or issues them immediately.
*/
private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
actionsForReplicaThread, startTime);
if (primaryCallTimeout == 0) {
// Start replica calls immediately.
replicaRunnable.run();
} else {
// Start the thread that may kick off replica gets.
// TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
try {
pool.submit(replicaRunnable);
} catch (RejectedExecutionException ree) {
LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
}
}
}
/**
@ -665,11 +911,11 @@ class AsyncProcess {
if (!canRetry) {
// Batch.Callback<Res> was not called on failure in 0.94. We keep this.
errors.add(throwable, row, server);
if (results != null) {
setResult(originalIndex, row, throwable);
}
decActionCounter();
setError(originalIndex, row, throwable, server);
} else {
// See if we are dealing with a replica action that was completed from other server.
// Doesn't have to be synchronized, worst case we'd retry and be unable to set result.
canRetry = !isActionComplete(originalIndex, row);
}
return canRetry;
@ -685,15 +931,17 @@ class AsyncProcess {
*/
private void receiveGlobalFailure(
MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
byte[] row = rsActions.actions.values().iterator().next().get(0).getAction().getRow();
hConnection.updateCachedLocations(tableName, null, row, null, server);
errorsByServer.reportServerError(server);
boolean canRetry = errorsByServer.canRetryMore(numAttempt);
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
byte[] regionName = e.getKey();
byte[] row = e.getValue().iterator().next().getAction().getRow();
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
// TODO: depending on type of exception we might not want to update cache at all?
hConnection.updateCachedLocations(tableName, regionName, row, null, server);
for (Action<Row> action : e.getValue()) {
if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) {
toReplay.add(action);
@ -791,14 +1039,16 @@ class AsyncProcess {
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = sentAction.getAction();
if (!regionFailureRegistered) { // We're doing this once per location.
// Register corresponding failures once per server/once per region.
if (!regionFailureRegistered) {
regionFailureRegistered = true;
// The location here is used as a server name.
hConnection.updateCachedLocations(tableName, regionName, row.getRow(), result, server);
if (failureCount == 0) {
errorsByServer.reportServerError(server);
canRetry = errorsByServer.canRetryMore(numAttempt);
}
hConnection.updateCachedLocations(
tableName, regionName, row.getRow(), result, server);
}
if (failureCount == 0) {
errorsByServer.reportServerError(server);
// We determine canRetry only once for all calls, after reporting server failure.
canRetry = errorsByServer.canRetryMore(numAttempt);
}
++failureCount;
if (manageError(
@ -809,16 +1059,14 @@ class AsyncProcess {
if (callback != null) {
try {
//noinspection unchecked
// TODO: would callback expect a replica region name if it gets one?
this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
} catch (Throwable t) {
LOG.error("User callback threw an exception for "
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
}
}
if (results != null) {
setResult(sentAction.getOriginalIndex(), sentAction.getAction(), result);
}
decActionCounter();
setResult(sentAction, result);
}
}
}
@ -881,38 +1129,185 @@ class AsyncProcess {
return sb.toString();
}
private void setResult(int index, Row row, Object result) {
if (result == null) throw new RuntimeException("Result cannot be set to null");
if (results[index] != null) throw new RuntimeException("Result was already set");
results[index] = result;
/**
* Sets the non-error result from a particular action.
* @param action Action (request) that the server responded to.
* @param result The result.
*/
private void setResult(Action<Row> action, Object result) {
ReplicaResultState state = null;
boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
if (results == null || ((state = trySetResultSimple(
action.getOriginalIndex(), action.getAction(), result, isStale)) == null)) {
decActionCounter();
return; // Simple case, no replica requests.
}
synchronized (state) {
if (state.callCount == 0) return; // someone already set the result
state.result = result;
state.callCount = 0;
state.replicaErrors = null; // no longer matters
}
decActionCounter();
}
/**
* Sets the error from a particular action.
* @param index Original action index.
* @param row Original request.
* @param throwable The resulting error.
* @param server The source server.
*/
private void setError(int index, Row row, Throwable throwable, ServerName server) {
ReplicaResultState state = null;
if (results == null
|| ((state = trySetResultSimple(index, row, throwable, false)) == null)) {
errors.add(throwable, row, server);
decActionCounter();
return; // Simple case, no replica requests.
}
BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
boolean isActionDone = false;
synchronized (state) {
switch (state.callCount) {
case 0: return; // someone already set the result
case 1: { // All calls failed, we are the last error.
state.result = throwable;
target = errors;
isActionDone = true;
break;
}
default: {
assert state.callCount > 1;
if (state.replicaErrors == null) {
state.replicaErrors = new BatchErrors();
}
target = state.replicaErrors;
break;
}
}
--state.callCount;
}
target.add(throwable, row, server);
if (!isActionDone) return;
if (state.replicaErrors != null) { // last call, no need to lock
errors.merge(state.replicaErrors);
state.replicaErrors = null;
}
decActionCounter();
}
/**
* Checks if the action is complete; used on error to prevent needless retries.
* Does not synchronize, assuming element index/field accesses are atomic.
* This is an opportunistic optimization check, doesn't have to be strict.
* @param index Original action index.
* @param row Original request.
*/
private boolean isActionComplete(int index, Row row) {
if (!isReplicaGet(row)) return false;
Object resObj = results[index];
return (resObj != null) && (!(resObj instanceof ReplicaResultState)
|| ((ReplicaResultState)resObj).callCount == 0);
}
/**
* Tries to set the result or error for a particular action as if there were no replica calls.
* @return null if successful; replica state if there were in fact replica calls.
*/
private ReplicaResultState trySetResultSimple(
int index, Row row, Object result, boolean isFromReplica) {
Object resObj = null;
if (!isReplicaGet(row)) {
if (isFromReplica) {
throw new AssertionError("Unexpected stale result for " + row);
}
results[index] = result;
} else {
synchronized (replicaResultLock) {
if ((resObj = results[index]) == null) {
if (isFromReplica) {
throw new AssertionError("Unexpected stale result for " + row);
}
results[index] = result;
}
}
}
return (resObj == null || !(resObj instanceof ReplicaResultState))
? null : (ReplicaResultState)resObj;
}
private void decActionCounter() {
actionsInProgress.decrementAndGet();
if (hasAnyReplicaGets && (actionsInProgress.get() == 1)) {
// Convert replica sync structures to results.
int staleCount = 0;
if (replicaGetIndices == null) {
for (int i = 0; i < results.length; ++i) {
staleCount += convertReplicaResult(i) ? 1 : 0;
}
} else {
for (int i = 0; i < replicaGetIndices.length; ++i) {
staleCount += convertReplicaResult(replicaGetIndices[i]) ? 1 : 0;
}
}
if (!actionsInProgress.compareAndSet(1, 0)) {
throw new AssertionError("Cannot set actions in progress to 0");
}
if (staleCount > 0) {
LOG.trace("Returning " + staleCount + " stale results");
}
} else {
actionsInProgress.decrementAndGet();
}
synchronized (actionsInProgress) {
actionsInProgress.notifyAll();
}
}
private boolean convertReplicaResult(int index) {
if (!(results[index] instanceof ReplicaResultState)) return false;
ReplicaResultState state = (ReplicaResultState)results[index];
// We know that noone will touch state with 0 callCount, no need to lock
if (state.callCount != 0) {
throw new AssertionError("Actions are done but callcount is " + state.callCount);
}
// TODO: we expect the Result coming from server to already have "isStale" specified.
Object res = results[index] = state.result;
return (res instanceof Result) && ((Result)res).isStale();
}
@Override
public void waitUntilDone() throws InterruptedIOException {
long lastLog = EnvironmentEdgeManager.currentTimeMillis();
long currentInProgress;
try {
while (0 != (currentInProgress = actionsInProgress.get())) {
long now = EnvironmentEdgeManager.currentTimeMillis();
waitUntilDone(Long.MAX_VALUE);
} catch (InterruptedException iex) {
throw new InterruptedIOException(iex.getMessage());
}
}
private boolean waitUntilDone(long cutoff) throws InterruptedException {
boolean hasWait = cutoff != Long.MAX_VALUE;
long lastLog = hasWait ? 0 : EnvironmentEdgeManager.currentTimeMillis();
long currentInProgress;
while (0 != (currentInProgress = actionsInProgress.get())) {
long now = 0;
if (hasWait && (now = EnvironmentEdgeManager.currentTimeMillis()) > cutoff) {
return false;
}
if (!hasWait) {
// Only log if wait is infinite.
now = EnvironmentEdgeManager.currentTimeMillis();
if (now > lastLog + 10000) {
lastLog = now;
LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish");
}
synchronized (actionsInProgress) {
if (actionsInProgress.get() == 0) break;
actionsInProgress.wait(100);
actionsInProgress.wait(Math.min(100, hasWait ? (cutoff - now) : Long.MAX_VALUE));
}
}
} catch (InterruptedException iex) {
throw new InterruptedIOException(iex.getMessage());
}
return true;
}
@Override
@ -931,7 +1326,8 @@ class AsyncProcess {
}
@Override
public Object[] getResults() {
public Object[] getResults() throws InterruptedIOException {
waitUntilDone();
return results;
}
}
@ -1080,4 +1476,8 @@ class AsyncProcess {
return new ConnectionManager.ServerErrorTracker(
this.serverTrackerTimeout, this.numTries);
}
private static boolean isReplicaGet(Row row) {
return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
}
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@ -231,4 +232,9 @@ interface ClusterConnection extends HConnection {
* @return Default AsyncProcess associated with this connection.
*/
AsyncProcess getAsyncProcess();
/**
* @return All locations for a particular region.
*/
RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException;
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@ -202,6 +203,11 @@ class ConnectionAdapter implements ClusterConnection {
return wrappedConnection.locateRegion(tableName, row);
}
@Override
public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
return wrappedConnection.locateRegionAll(tableName, row);
}
@Override
public void clearRegionCache() {
wrappedConnection.clearRegionCache();

View File

@ -946,10 +946,15 @@ class ConnectionManager {
}
@Override
public HRegionLocation locateRegion(final TableName tableName,
final byte [] row)
throws IOException{
RegionLocations locations = locateRegion(tableName, row, true, true);
public RegionLocations locateRegionAll(
final TableName tableName, final byte[] row) throws IOException{
return locateRegion(tableName, row, true, true);
}
@Override
public HRegionLocation locateRegion(
final TableName tableName, final byte[] row) throws IOException{
RegionLocations locations = locateRegionAll(tableName, row);
return locations == null ? null : locations.getRegionLocation();
}

View File

@ -84,17 +84,6 @@ public final class MultiAction<R> {
return actions.keySet();
}
/**
* @return All actions from all regions in this container
*/
public List<Action<R>> allActions() {
List<Action<R>> res = new ArrayList<Action<R>>();
for (List<Action<R>> lst : actions.values()) {
res.addAll(lst);
}
return res;
}
public boolean hasNonceGroup() {
return nonceGroup != HConstants.NO_NONCE;
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
/**
@ -153,4 +154,9 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
// Use the location we were given in the constructor rather than go look it up.
setStub(getConnection().getClient(this.location.getServerName()));
}
@VisibleForTesting
ServerName getServerName() {
return location.getServerName();
}
}

View File

@ -30,7 +30,7 @@ public class RegionReplicaUtil {
/**
* The default replicaId for the region
*/
private static final int DEFAULT_REPLICA_ID = 0;
static final int DEFAULT_REPLICA_ID = 0;
/**
* Returns the HRegionInfo for the given replicaId. HRegionInfo's correspond to
@ -62,4 +62,7 @@ public class RegionReplicaUtil {
return getRegionInfoForReplica(regionInfo, DEFAULT_REPLICA_ID);
}
public static boolean isDefaultReplica(int replicaId) {
return DEFAULT_REPLICA_ID == replicaId;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -43,8 +45,11 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
@ -53,6 +58,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Category(MediumTests.class)
public class TestAsyncProcess {
@ -64,8 +70,9 @@ public class TestAsyncProcess {
private static final byte[] FAILS = "FAILS".getBytes();
private static final Configuration conf = new Configuration();
private static ServerName sn = ServerName.valueOf("localhost:10,1254");
private static ServerName sn2 = ServerName.valueOf("localhost:140,12540");
private static ServerName sn = ServerName.valueOf("s1:1,1");
private static ServerName sn2 = ServerName.valueOf("s2:2,2");
private static ServerName sn3 = ServerName.valueOf("s3:3,3");
private static HRegionInfo hri1 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
private static HRegionInfo hri2 =
@ -76,6 +83,16 @@ public class TestAsyncProcess {
private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
// Replica stuff
private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
new HRegionLocation(hri2r1, sn3));
private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
private static final String success = "success";
private static Exception failure = new Exception("failure");
@ -139,6 +156,7 @@ public class TestAsyncProcess {
ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
@Override
public void execute(Runnable command) {
throw new RejectedExecutionException("test under failure");
}
@ -158,7 +176,17 @@ public class TestAsyncProcess {
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
callsCt.incrementAndGet();
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions);
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
@Override
public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
if (Arrays.equals(FAILS, a.getAction().getRow())) {
mr.add(regionName, a.getOriginalIndex(), failure);
} else {
mr.add(regionName, a.getOriginalIndex(), success);
}
}
});
return new RpcRetryingCaller<MultiResponse>(100, 10) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
@ -204,23 +232,106 @@ public class TestAsyncProcess {
}
}
static MultiResponse createMultiResponse(
final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
class MyAsyncProcessWithReplicas extends MyAsyncProcess {
private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator());
private long primarySleepMs = 0, replicaSleepMs = 0;
private Map<ServerName, Long> customPrimarySleepMs = new HashMap<ServerName, Long>();
private final AtomicLong replicaCalls = new AtomicLong(0);
public void addFailures(HRegionInfo... hris) {
for (HRegionInfo hri : hris) {
failures.add(hri.getRegionName());
}
}
public long getReplicaCallCount() {
return replicaCalls.get();
}
public void setPrimaryCallDelay(ServerName server, long primaryMs) {
customPrimarySleepMs.put(server, primaryMs);
}
public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
super(hc, conf);
}
public void setCallDelays(long primaryMs, long replicaMs) {
this.primarySleepMs = primaryMs;
this.replicaSleepMs = replicaMs;
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
MultiServerCallable<Row> callable) {
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
@Override
public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
if (failures.contains(regionName)) {
mr.add(regionName, a.getOriginalIndex(), failure);
} else {
boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
mr.add(regionName, a.getOriginalIndex(),
Result.create(new Cell[0], null, isStale));
}
}
});
// Currently AsyncProcess either sends all-replica, or all-primary request.
final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
final ServerName server = ((MultiServerCallable<?>)callable).getServerName();
String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
+ callable.getMulti().actions.size() + " entries: ";
for (byte[] region : callable.getMulti().actions.keySet()) {
debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
}
LOG.debug(debugMsg);
if (!isDefault) {
replicaCalls.incrementAndGet();
}
return new RpcRetryingCaller<MultiResponse>(100, 10) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
throws IOException, RuntimeException {
long sleep = -1;
if (isDefault) {
Long customSleep = customPrimarySleepMs.get(server);
sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
} else {
sleep = replicaSleepMs;
}
if (sleep != 0) {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
}
}
return mr;
}
};
}
}
static MultiResponse createMultiResponse(final MultiAction<Row> multi,
AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
final MultiResponse mr = new MultiResponse();
nbMultiResponse.incrementAndGet();
for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
byte[] regionName = entry.getKey();
for (Action<Row> a : entry.getValue()) {
nbActions.incrementAndGet();
if (Arrays.equals(FAILS, a.getAction().getRow())) {
mr.add(regionName, a.getOriginalIndex(), failure);
} else {
mr.add(regionName, a.getOriginalIndex(), success);
}
gen.addResponse(mr, regionName, a);
}
}
return mr;
}
private static interface ResponseGenerator {
void addResponse(final MultiResponse mr, byte[] regionName, Action<Row> a);
}
/**
* Returns our async process.
*/
@ -233,9 +344,8 @@ public class TestAsyncProcess {
}
@Override
public HRegionLocation locateRegion(final TableName tableName,
final byte[] row) {
return loc1;
public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
return new RegionLocations(loc1);
}
}
@ -253,18 +363,18 @@ public class TestAsyncProcess {
}
@Override
public HRegionLocation locateRegion(final TableName tableName,
final byte[] row) {
public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
int i = 0;
for (HRegionLocation hr:hrl){
if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
usedRegions[i] = true;
return hr;
for (HRegionLocation hr : hrl){
if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
usedRegions[i] = true;
return new RegionLocations(hr);
}
i++;
}
return null;
}
}
@Test
@ -284,6 +394,7 @@ public class TestAsyncProcess {
ClusterConnection hc = createHConnection();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
@Override
public void update(byte[] region, byte[] row, Object result) {
updateCalled.incrementAndGet();
}
@ -458,6 +569,7 @@ public class TestAsyncProcess {
final Thread myThread = Thread.currentThread();
Thread t = new Thread() {
@Override
public void run() {
Threads.sleep(2000);
myThread.interrupt();
@ -478,6 +590,7 @@ public class TestAsyncProcess {
final long sleepTime = 2000;
Thread t2 = new Thread() {
@Override
public void run() {
Threads.sleep(sleepTime);
while (ap.tasksInProgress.get() > 0) {
@ -496,32 +609,33 @@ public class TestAsyncProcess {
}
private static ClusterConnection createHConnection() throws IOException {
ClusterConnection hc = createHConnectionCommon();
setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
setMockLocation(hc, FAILS, new RegionLocations(loc2));
return hc;
}
private static ClusterConnection createHConnectionWithReplicas() throws IOException {
ClusterConnection hc = createHConnectionCommon();
setMockLocation(hc, DUMMY_BYTES_1, hrls1);
setMockLocation(hc, DUMMY_BYTES_2, hrls2);
setMockLocation(hc, DUMMY_BYTES_3, hrls3);
return hc;
}
private static void setMockLocation(ClusterConnection hc, byte[] row,
RegionLocations result) throws IOException {
Mockito.when(hc.locateRegionAll(
Mockito.eq(DUMMY_TABLE), Mockito.eq(row))).thenReturn(result);
}
private static ClusterConnection createHConnectionCommon() {
ClusterConnection hc = Mockito.mock(ClusterConnection.class);
Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
Mockito.eq(DUMMY_BYTES_1), Mockito.anyBoolean())).thenReturn(loc1);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
Mockito.eq(DUMMY_BYTES_1))).thenReturn(loc1);
Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
Mockito.eq(DUMMY_BYTES_2), Mockito.anyBoolean())).thenReturn(loc2);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
Mockito.eq(DUMMY_BYTES_2))).thenReturn(loc2);
Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
Mockito.eq(DUMMY_BYTES_3), Mockito.anyBoolean())).thenReturn(loc2);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
Mockito.eq(DUMMY_BYTES_3))).thenReturn(loc3);
Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
Mockito.eq(FAILS), Mockito.anyBoolean())).thenReturn(loc2);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
Mockito.eq(FAILS))).thenReturn(loc2);
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
return hc;
}
@ -756,7 +870,124 @@ public class TestAsyncProcess {
Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
}
private void verifyResult(AsyncRequestFuture ars, boolean... expected) {
@Test
public void testReplicaReplicaSuccess() throws Exception {
// Main call takes too long so replicas succeed, except for one region w/o replicas.
// One region has no replica, so the main call succeeds for it.
MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
@Test
public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
// Main call succeeds before replica calls are kicked off.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
@Test
public void testReplicaParallelCallsSucceed() throws Exception {
// Either main or replica can succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
long replicaCalls = ap.getReplicaCallCount();
Assert.assertTrue(replicaCalls >= 0);
Assert.assertTrue(replicaCalls <= 2);
}
@Test
public void testReplicaPartialReplicaCall() throws Exception {
// One server is slow, so the result for its region comes from replica, whereas
// the result for other region comes from primary before replica calls happen.
// There should be no replica call for that region at all.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
ap.setPrimaryCallDelay(sn2, 2000);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
Assert.assertEquals(1, ap.getReplicaCallCount());
}
@Test
public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
// Main calls fail before replica calls can start - this is currently not handled.
// It would probably never happen if we can get location (due to retries),
// and it would require additional synchronization.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
ap.addFailures(hri1, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
@Test
public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
// Main calls fails after replica calls start. For two-replica region, one replica call
// also fails. Regardless, we get replica results for both regions.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
ap.addFailures(hri1, hri1r2, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
@Test
public void testReplicaAllCallsFailForOneRegion() throws Exception {
// For one of the region, all 3, main and replica, calls fail. For the other, replica
// call fails but its exception should not be visible as it did succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
// We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
Assert.assertEquals(3, ars.getErrors().getNumExceptions());
for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
}
}
private MyAsyncProcessWithReplicas createReplicaAp(
int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
}
private MyAsyncProcessWithReplicas createReplicaAp(
int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
// TODO: this is kind of timing dependent... perhaps it should detect from createCaller
// that the replica call has happened and that way control the ordering.
Configuration conf = new Configuration();
ClusterConnection conn = createHConnectionWithReplicas();
conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs);
if (retries > 0) {
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
}
MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
ap.setCallDelays(primaryMs, replicaMs);
return ap;
}
private static List<Get> makeTimelineGets(byte[]... rows) {
List<Get> result = new ArrayList<Get>();
for (byte[] row : rows) {
Get get = new Get(row);
get.setConsistency(Consistency.TIMELINE);
result.add(get);
}
return result;
}
private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
Object[] actual = ars.getResults();
Assert.assertEquals(expected.length, actual.length);
for (int i = 0; i < expected.length; ++i) {
@ -764,6 +995,27 @@ public class TestAsyncProcess {
}
}
/** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
private enum RR {
TRUE,
FALSE,
DONT_CARE,
FAILED
}
private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
Object[] actuals = ars.getResults();
Assert.assertEquals(expecteds.length, actuals.length);
for (int i = 0; i < expecteds.length; ++i) {
Object actual = actuals[i];
RR expected = expecteds[i];
Assert.assertEquals(expected == RR.FAILED, actual instanceof Throwable);
if (expected != RR.FAILED && expected != RR.DONT_CARE) {
Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
}
}
}
/**
* @param regCnt the region: 1 to 3.
* @param success if true, the put will succeed.

View File

@ -28,12 +28,10 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -49,7 +47,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CoprocessorHConnection implements ClusterConnection {
class CoprocessorHConnection implements ClusterConnection {
private static final NonceGenerator ng = new ConnectionManager.NoNonceGenerator();
/**
@ -60,7 +58,7 @@ public class CoprocessorHConnection implements ClusterConnection {
* @return an unmanaged {@link HConnection}.
* @throws IOException if we cannot create the basic connection
*/
public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
throws IOException {
ClusterConnection connection =
ConnectionManager.createConnectionInternal(env.getConfiguration());
@ -427,4 +425,9 @@ public class CoprocessorHConnection implements ClusterConnection {
public AsyncProcess getAsyncProcess() {
return delegate.getAsyncProcess();
}
@Override
public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
return delegate.locateRegionAll(tableName, row);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@ -110,6 +111,8 @@ public class HConnectionTestingUtility {
thenReturn(loc);
Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
thenReturn(loc);
Mockito.when(c.locateRegionAll((TableName) Mockito.any(), (byte[]) Mockito.any())).
thenReturn(new RegionLocations(loc));
if (admin != null) {
// If a call to getAdmin, return this implementation.
Mockito.when(c.getAdmin(Mockito.any(ServerName.class))).