HBASE-10427 clean up HRegionLocation/ServerName usage

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1562338 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2014-01-29 01:07:51 +00:00
parent f4dba9d98e
commit a29dbf4162
9 changed files with 132 additions and 114 deletions

View File

@ -55,8 +55,8 @@ public class HRegionLocation implements Comparable<HRegionLocation> {
*/
@Override
public String toString() {
return "region=" + this.regionInfo.getRegionNameAsString() +
", hostname=" + this.serverName + ", seqNum=" + seqNum;
return "region=" + (this.regionInfo == null ? "null" : this.regionInfo.getRegionNameAsString())
+ ", hostname=" + this.serverName + ", seqNum=" + seqNum;
}
/**

View File

@ -157,7 +157,7 @@ class AsyncProcess<CResult> {
* the current process to be stopped without proceeding with the other operations in
* the queue.
*/
boolean failure(int originalIndex, byte[] region, Row row, Throwable t);
boolean failure(int originalIndex, Row row, Throwable t);
/**
* Called on a failure we plan to retry. This allows the user to stop retrying. Will be
@ -165,7 +165,7 @@ class AsyncProcess<CResult> {
*
* @return false if we should retry, true otherwise.
*/
boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception);
boolean retriableFailure(int originalIndex, Row row, Throwable exception);
}
private static class BatchErrors {
@ -173,14 +173,14 @@ class AsyncProcess<CResult> {
private final List<Row> actions = new ArrayList<Row>();
private final List<String> addresses = new ArrayList<String>();
public synchronized void add(Throwable ex, Row row, HRegionLocation location) {
public synchronized void add(Throwable ex, Row row, ServerName serverName) {
if (row == null){
throw new IllegalArgumentException("row cannot be null. location=" + location);
throw new IllegalArgumentException("row cannot be null. location=" + serverName);
}
throwables.add(ex);
actions.add(row);
addresses.add(location != null ? location.getServerName().toString() : "null location");
addresses.add(serverName != null ? serverName.toString() : "null");
}
private synchronized RetriesExhaustedWithDetailsException makeException() {
@ -267,10 +267,8 @@ class AsyncProcess<CResult> {
return;
}
// This looks like we are keying by region but HRegionLocation has a comparator that compares
// on the server portion only (hostname + port) so this Map collects regions by server.
Map<HRegionLocation, MultiAction<Row>> actionsByServer =
new HashMap<HRegionLocation, MultiAction<Row>>();
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
long currentTaskCnt = tasksDone.get();
@ -324,13 +322,13 @@ class AsyncProcess<CResult> {
* @param actionsByServer the multiaction per server
* @param ng Nonce generator, or null if no nonces are needed.
*/
private void addAction(HRegionLocation loc, Action<Row> action, Map<HRegionLocation,
MultiAction<Row>> actionsByServer, NonceGenerator ng) {
private void addAction(HRegionLocation loc, Action<Row> action,
Map<ServerName, MultiAction<Row>> actionsByServer, NonceGenerator ng) {
final byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction<Row> multiAction = actionsByServer.get(loc);
MultiAction<Row> multiAction = actionsByServer.get(loc.getServerName());
if (multiAction == null) {
multiAction = new MultiAction<Row>();
actionsByServer.put(loc, multiAction);
actionsByServer.put(loc.getServerName(), multiAction);
}
if (action.hasNonce() && !multiAction.hasNonceGroup()) {
// TODO: this code executes for every (re)try, and calls getNonceGroup again
@ -484,8 +482,8 @@ class AsyncProcess<CResult> {
}
// group per location => regions server
final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
new HashMap<HRegionLocation, MultiAction<Row>>();
final Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
NonceGenerator ng = this.hConnection.getNonceGenerator();
for (Action<Row> action : currentActions) {
@ -509,43 +507,43 @@ class AsyncProcess<CResult> {
* @param numAttempt the attempt number.
*/
public void sendMultiAction(final List<Action<Row>> initialActions,
Map<HRegionLocation, MultiAction<Row>> actionsByServer,
Map<ServerName, MultiAction<Row>> actionsByServer,
final int numAttempt,
final HConnectionManager.ServerErrorTracker errorsByServer) {
// Send the queries and add them to the inProgress list
// This iteration is by server (the HRegionLocation comparator is by server portion only).
for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
final HRegionLocation loc = e.getKey();
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
final ServerName server = e.getKey();
final MultiAction<Row> multiAction = e.getValue();
incTaskCounters(multiAction.getRegions(), loc.getServerName());
incTaskCounters(multiAction.getRegions(), server);
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
@Override
public void run() {
MultiResponse res;
try {
MultiServerCallable<Row> callable = createCallable(loc, multiAction);
MultiServerCallable<Row> callable = createCallable(server, multiAction);
try {
res = createCaller(callable).callWithoutRetries(callable);
} catch (IOException e) {
// The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server.
receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e,
receiveGlobalFailure(initialActions, multiAction, server, numAttempt, e,
errorsByServer);
return;
} catch (Throwable t) {
// This should not happen. Let's log & retry anyway.
LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
" Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t);
receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t,
" Retrying. Server is " + server.getServerName() + ", tableName=" + tableName, t);
receiveGlobalFailure(initialActions, multiAction, server, numAttempt, t,
errorsByServer);
return;
}
// Nominal case: we received an answer from the server, and it's not an exception.
receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);
receiveMultiAction(initialActions, multiAction, server, res, numAttempt, errorsByServer);
} finally {
decTaskCounters(multiAction.getRegions(), loc.getServerName());
decTaskCounters(multiAction.getRegions(), server);
}
}
});
@ -555,12 +553,12 @@ class AsyncProcess<CResult> {
} catch (RejectedExecutionException ree) {
// This should never happen. But as the pool is provided by the end user, let's secure
// this a little.
decTaskCounters(multiAction.getRegions(), loc.getServerName());
decTaskCounters(multiAction.getRegions(), server);
LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
" Server is " + loc.getServerName(), ree);
" Server is " + server.getServerName(), ree);
// We're likely to fail again, but this will increment the attempt counter, so it will
// finish.
receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer);
receiveGlobalFailure(initialActions, multiAction, server, numAttempt, ree, errorsByServer);
}
}
}
@ -568,9 +566,9 @@ class AsyncProcess<CResult> {
/**
* Create a callable. Isolated to be easily overridden in the tests.
*/
protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(hConnection, tableName, location, multi);
protected MultiServerCallable<Row> createCallable(
final ServerName server, final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(hConnection, tableName, server, multi);
}
/**
@ -589,29 +587,24 @@ class AsyncProcess<CResult> {
* @param row the row
* @param canRetry if false, we won't retry whatever the settings.
* @param throwable the throwable, if any (can be null)
* @param location the location, if any (can be null)
* @param server the location, if any (can be null)
* @return true if the action can be retried, false otherwise.
*/
private boolean manageError(int originalIndex, Row row, boolean canRetry,
Throwable throwable, HRegionLocation location) {
private boolean manageError(int originalIndex, Row row,
boolean canRetry, Throwable throwable, ServerName server) {
if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) {
canRetry = false;
}
byte[] region = null;
if (canRetry && callback != null) {
region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
canRetry = callback.retriableFailure(originalIndex, row, region, throwable);
canRetry = callback.retriableFailure(originalIndex, row, throwable);
}
if (!canRetry) {
if (callback != null) {
if (region == null && location != null) {
region = location.getRegionInfo().getEncodedNameAsBytes();
}
callback.failure(originalIndex, region, row, throwable);
callback.failure(originalIndex, row, throwable);
}
errors.add(throwable, row, location);
errors.add(throwable, row, server);
this.hasError.set(true);
}
@ -623,29 +616,29 @@ class AsyncProcess<CResult> {
*
* @param initialActions the full initial action list
* @param rsActions the actions still to do from the initial list
* @param location the destination
* @param server the destination
* @param numAttempt the number of attempts so far
* @param t the throwable (if any) that caused the resubmit
*/
private void receiveGlobalFailure(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
HRegionLocation location, int numAttempt, Throwable t,
ServerName server, int numAttempt, Throwable t,
HConnectionManager.ServerErrorTracker errorsByServer) {
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
hConnection.updateCachedLocations(tableName,
rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
errorsByServer.reportServerError(location);
rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, server);
errorsByServer.reportServerError(server);
List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
for (Action<Row> action : e.getValue()) {
if (manageError(action.getOriginalIndex(), action.getAction(), true, t, location)) {
if (manageError(action.getOriginalIndex(), action.getAction(), true, t, server)) {
toReplay.add(action);
}
}
}
logAndResubmit(initialActions, location, toReplay, numAttempt, rsActions.size(),
logAndResubmit(initialActions, server, toReplay, numAttempt, rsActions.size(),
t, errorsByServer);
}
@ -653,7 +646,7 @@ class AsyncProcess<CResult> {
* Log as many info as possible, and, if there is something to replay, submit it again after
* a back off sleep.
*/
private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
private void logAndResubmit(List<Action<Row>> initialActions, ServerName oldLocation,
List<Action<Row>> toReplay, int numAttempt, int failureCount,
Throwable throwable,
HConnectionManager.ServerErrorTracker errorsByServer) {
@ -662,13 +655,11 @@ class AsyncProcess<CResult> {
if (failureCount != 0) {
// We have a failure but nothing to retry. We're done, it's a final failure..
LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
oldLocation.getServerName(), throwable, -1, false,
errorsByServer.getStartTrackingTime()));
oldLocation, throwable, -1, false, errorsByServer.getStartTrackingTime()));
} else if (numAttempt > startLogErrorsCnt + 1) {
// The operation was successful, but needed several attempts. Let's log this.
LOG.info(createLog(numAttempt, failureCount, 0,
oldLocation.getServerName(), throwable, -1, false,
errorsByServer.getStartTrackingTime()));
oldLocation, throwable, -1, false, errorsByServer.getStartTrackingTime()));
}
return;
}
@ -686,8 +677,7 @@ class AsyncProcess<CResult> {
// We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on
LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
oldLocation.getServerName(), throwable, backOffTime, true,
errorsByServer.getStartTrackingTime()));
oldLocation, throwable, backOffTime, true, errorsByServer.getStartTrackingTime()));
}
try {
@ -706,12 +696,12 @@ class AsyncProcess<CResult> {
*
* @param initialActions - the whole action list
* @param multiAction - the multiAction we sent
* @param location - the location. It's used as a server name.
* @param server - the location.
* @param responses - the response, if any
* @param numAttempt - the attempt
*/
private void receiveMultiAction(List<Action<Row>> initialActions, MultiAction<Row> multiAction,
HRegionLocation location,
ServerName server,
MultiResponse responses, int numAttempt,
HConnectionManager.ServerErrorTracker errorsByServer) {
assert responses != null;
@ -743,15 +733,15 @@ class AsyncProcess<CResult> {
if (!regionFailureRegistered) { // We're doing this once per location.
regionFailureRegistered= true;
// The location here is used as a server name.
hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
hConnection.updateCachedLocations(this.tableName, row.getRow(), result, server);
if (failureCount == 1) {
errorsByServer.reportServerError(location);
errorsByServer.reportServerError(server);
canRetry = errorsByServer.canRetryMore(numAttempt);
}
}
if (manageError(correspondingAction.getOriginalIndex(), row, canRetry,
throwable, location)) {
throwable, server)) {
toReplay.add(correspondingAction);
}
} else { // success
@ -779,22 +769,22 @@ class AsyncProcess<CResult> {
}
if (failureCount == 0) {
errorsByServer.reportServerError(location);
errorsByServer.reportServerError(server);
canRetry = errorsByServer.canRetryMore(numAttempt);
}
hConnection.updateCachedLocations(this.tableName, actions.get(0).getAction().getRow(),
throwable, location);
throwable, server);
failureCount += actions.size();
for (Action<Row> action : actions) {
Row row = action.getAction();
if (manageError(action.getOriginalIndex(), row, canRetry, throwable, location)) {
if (manageError(action.getOriginalIndex(), row, canRetry, throwable, server)) {
toReplay.add(action);
}
}
}
logAndResubmit(initialActions, location, toReplay, numAttempt, failureCount,
logAndResubmit(initialActions, server, toReplay, numAttempt, failureCount,
throwable, errorsByServer);
}

View File

@ -296,6 +296,10 @@ public interface HConnection extends Abortable, Closeable {
HRegionLocation relocateRegion(final byte[] tableName,
final byte [] row) throws IOException;
@Deprecated
void updateCachedLocations(TableName tableName, byte[] rowkey,
Object exception, HRegionLocation source);
/**
* Update the location cache. This is used internally by HBase, in most cases it should not be
* used by the client application.
@ -305,7 +309,7 @@ public interface HConnection extends Abortable, Closeable {
* @param source the previous location
*/
void updateCachedLocations(TableName tableName, byte[] rowkey,
Object exception, HRegionLocation source);
Object exception, ServerName source);
@Deprecated
void updateCachedLocations(byte[] tableName, byte[] rowkey,

View File

@ -1444,7 +1444,7 @@ public class HConnectionManager {
* @param source the source of the new location, if it's not coming from meta
* @param location the new location
*/
private void cacheLocation(final TableName tableName, final HRegionLocation source,
private void cacheLocation(final TableName tableName, final ServerName source,
final HRegionLocation location) {
boolean isFromMeta = (source == null);
byte [] startKey = location.getRegionInfo().getStartKey();
@ -1457,7 +1457,7 @@ public class HConnectionManager {
}
boolean updateCache;
// If the server in cache sends us a redirect, assume it's always valid.
if (oldLocation.equals(source)) {
if (oldLocation.getServerName().equals(source)) {
updateCache = true;
} else {
long newLocationSeqNum = location.getSeqNum();
@ -2193,7 +2193,7 @@ public class HConnectionManager {
}
}
void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
void updateCachedLocation(HRegionInfo hri, ServerName source,
ServerName serverName, long seqNum) {
HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
cacheLocation(hri.getTable(), source, newHrl);
@ -2204,14 +2204,13 @@ public class HConnectionManager {
* @param hri The region in question.
* @param source The source of the error that prompts us to invalidate cache.
*/
void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
tableLocations.remove(hri.getStartKey(), source);
void deleteCachedLocation(HRegionInfo hri, ServerName source) {
getTableLocations(hri.getTable()).remove(hri.getStartKey());
}
@Override
public void deleteCachedRegionLocation(final HRegionLocation location) {
if (location == null) {
if (location == null || location.getRegionInfo() == null) {
return;
}
@ -2227,6 +2226,12 @@ public class HConnectionManager {
}
}
@Override
public void updateCachedLocations(final TableName tableName, byte[] rowkey,
final Object exception, final HRegionLocation source) {
updateCachedLocations(tableName, rowkey, exception, source.getServerName());
}
/**
* Update the location with the new value (if the exception is a RegionMovedException)
* or delete it from the cache. Does nothing if we can be sure from the exception that
@ -2237,21 +2242,21 @@ public class HConnectionManager {
*/
@Override
public void updateCachedLocations(final TableName tableName, byte[] rowkey,
final Object exception, final HRegionLocation source) {
final Object exception, final ServerName source) {
if (rowkey == null || tableName == null) {
LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
", tableName=" + (tableName == null ? "null" : tableName));
return;
}
if (source == null || source.getServerName() == null){
if (source == null) {
// This should not happen, but let's secure ourselves.
return;
}
// Is it something we have already updated?
final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey);
if (oldLocation == null || !source.getServerName().equals(oldLocation.getServerName())) {
if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
// There is no such location in the cache (it's been removed already) or
// the cache has already been refreshed with a different location. => nothing to do
return;
@ -2270,7 +2275,7 @@ public class HConnectionManager {
if (LOG.isTraceEnabled()) {
LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
rme.getHostname() + ":" + rme.getPort() +
" according to " + source.getHostnamePort());
" according to " + source.getHostAndPort());
}
// We know that the region is not anymore on this region server, but we know
// the new location.
@ -2391,7 +2396,7 @@ public class HConnectionManager {
}
@Override
public boolean failure(int pos, byte[] region, Row row, Throwable t) {
public boolean failure(int pos, Row row, Throwable t) {
assert pos < results.length;
results[pos] = t;
//Batch.Callback<Res> was not called on failure in 0.94. We keep this.
@ -2399,8 +2404,7 @@ public class HConnectionManager {
}
@Override
public boolean retriableFailure(int originalIndex, Row row, byte[] region,
Throwable exception) {
public boolean retriableFailure(int originalIndex, Row row, Throwable exception) {
return true; // we retry
}
}
@ -2681,8 +2685,8 @@ public class HConnectionManager {
*/
static class ServerErrorTracker {
// We need a concurrent map here, as we could have multiple threads updating it in parallel.
private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
new ConcurrentHashMap<HRegionLocation, ServerErrors>();
private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
new ConcurrentHashMap<ServerName, ServerErrors>();
private final long canRetryUntil;
private final int maxRetries;
private final String startTrackingTime;
@ -2709,7 +2713,7 @@ public class HConnectionManager {
* @param basePause The default hci pause.
* @return The time to wait before sending next request.
*/
long calculateBackoffTime(HRegionLocation server, long basePause) {
long calculateBackoffTime(ServerName server, long basePause) {
long result;
ServerErrors errorStats = errorsByServer.get(server);
if (errorStats != null) {
@ -2725,7 +2729,7 @@ public class HConnectionManager {
*
* @param server The server in question.
*/
void reportServerError(HRegionLocation server) {
void reportServerError(ServerName server) {
ServerErrors errors = errorsByServer.get(server);
if (errors != null) {
errors.addError();

View File

@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -52,13 +54,26 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
private final boolean cellBlock;
MultiServerCallable(final HConnection connection, final TableName tableName,
final HRegionLocation location, final MultiAction<R> multi) {
final ServerName location, final MultiAction<R> multi) {
super(connection, tableName, null);
this.multiAction = multi;
setLocation(location);
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
// we will store the server here, and throw if someone tries to obtain location/regioninfo.
this.location = new HRegionLocation(null, location);
this.cellBlock = isCellBlock();
}
@Override
protected HRegionLocation getLocation() {
throw new RuntimeException("Cannot get region location for multi-region request");
}
@Override
public HRegionInfo getHRegionInfo() {
throw new RuntimeException("Cannot get region info for multi-region request");
};
MultiAction<R> getMulti() {
return this.multiAction;
}
@ -133,6 +148,6 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
@Override
public void prepare(boolean reload) throws IOException {
// Use the location we were given in the constructor rather than go look it up.
setStub(getConnection().getClient(getLocation().getServerName()));
setStub(getConnection().getClient(this.location.getServerName()));
}
}

View File

@ -37,6 +37,11 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* Implementations call a RegionServer and implement {@link #call()}.
* Passed to a {@link RpcRetryingCaller} so we retry on fail.
* TODO: this class is actually tied to one region, because most of the paths make use of
* the regioninfo part of location when building requests. The only reason it works for
* multi-region requests (e.g. batch) is that they happen to not use the region parts.
* This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
* RegionCallable and actual RegionServerCallable with ServerName.
* @param <T> the class that the ServerCallable handles
*/
@InterfaceAudience.Private
@ -74,7 +79,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
throw new IOException("Failed to find location, tableName=" + tableName +
", row=" + Bytes.toString(row) + ", reload=" + reload);
}
setStub(getConnection().getClient(getLocation().getServerName()));
setStub(getConnection().getClient(this.location.getServerName()));
}
/**
@ -119,7 +124,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
// hbase:meta again to find the new location
if (this.location != null) getConnection().clearCaches(location.getServerName());
} else if (t instanceof RegionMovedException) {
getConnection().updateCachedLocations(tableName, row, t, location);
getConnection().updateCachedLocations(tableName, row, t, location.getServerName());
} else if (t instanceof NotServingRegionException && !retrying) {
// Purge cache entries for this specific region from hbase:meta cache
// since we don't call connect(true) when number of retries is 1.

View File

@ -104,8 +104,8 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti(),
nbMultiResponse, nbActions);
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions);
return new RpcRetryingCaller<MultiResponse>(conf) {
@Override
public MultiResponse callWithoutRetries( RetryingCallable<MultiResponse> callable)
@ -123,17 +123,18 @@ public class TestAsyncProcess {
}
}
static MultiResponse createMultiResponse(final HRegionLocation loc,
static MultiResponse createMultiResponse(
final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
final MultiResponse mr = new MultiResponse();
nbMultiResponse.incrementAndGet();
for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
for (Action a : entry.getValue()) {
byte[] regionName = entry.getKey();
for (Action<Row> a : entry.getValue()) {
nbActions.incrementAndGet();
if (Arrays.equals(FAILS, a.getAction().getRow())) {
mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure);
mr.add(regionName, a.getOriginalIndex(), failure);
} else {
mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success);
mr.add(regionName, a.getOriginalIndex(), success);
}
}
}
@ -387,9 +388,11 @@ public class TestAsyncProcess {
ap.submit(puts, false);
Assert.assertTrue(puts.isEmpty());
while (!ap.hasError()) {
long cutoff = System.currentTimeMillis() + 60000;
while (!ap.hasError() && System.currentTimeMillis() < cutoff) {
Thread.sleep(1);
}
Assert.assertTrue(ap.hasError());
ap.waitUntilDone();
Assert.assertEquals(mcb.successCalled.get(), 2);
@ -496,14 +499,13 @@ public class TestAsyncProcess {
}
@Override
public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) {
public boolean failure(int originalIndex, Row row, Throwable t) {
failureCalled.incrementAndGet();
return true;
}
@Override
public boolean retriableFailure(int originalIndex, Row row, byte[] region,
Throwable exception) {
public boolean retriableFailure(int originalIndex, Row row, Throwable exception) {
// We retry once only.
return (retriableFailure.incrementAndGet() < 2);
}

View File

@ -248,6 +248,11 @@ public class CoprocessorHConnection implements HConnection {
return delegate.relocateRegion(tableName, row);
}
public void updateCachedLocations(TableName tableName, byte[] rowkey, Object exception,
ServerName source) {
delegate.updateCachedLocations(tableName, rowkey, exception, source);
}
public void updateCachedLocations(TableName tableName, byte[] rowkey, Object exception,
HRegionLocation source) {
delegate.updateCachedLocations(tableName, rowkey, exception, source);

View File

@ -361,7 +361,8 @@ public class TestHCM {
final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1;
HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW);
conn.updateCachedLocation(loc.getRegionInfo(), loc, ServerName.valueOf("127.0.0.1", nextPort,
conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
ServerName.valueOf("127.0.0.1", nextPort,
HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort);
@ -553,19 +554,18 @@ public class TestHCM {
HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW);
assertNotNull(location);
HRegionLocation anySource = new HRegionLocation(location.getRegionInfo(), ServerName.valueOf(
location.getHostname(), location.getPort() - 1, 0L));
ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
// Same server as already in cache reporting - overwrites any value despite seqNum.
int nextPort = location.getPort() + 1;
conn.updateCachedLocation(location.getRegionInfo(), location,
conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
location = conn.getCachedLocation(TABLE_NAME2, ROW);
Assert.assertEquals(nextPort, location.getPort());
// No source specified - same.
nextPort = location.getPort() + 1;
conn.updateCachedLocation(location.getRegionInfo(), location,
conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
location = conn.getCachedLocation(TABLE_NAME2, ROW);
Assert.assertEquals(nextPort, location.getPort());
@ -867,9 +867,8 @@ public class TestHCM {
public void testErrorBackoffTimeCalculation() throws Exception {
// TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
final long ANY_PAUSE = 100;
HRegionInfo ri = new HRegionInfo(TABLE_NAME);
HRegionLocation location = new HRegionLocation(ri, ServerName.valueOf("127.0.0.1", 1, 0));
HRegionLocation diffLocation = new HRegionLocation(ri, ServerName.valueOf("127.0.0.1", 2, 0));
ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(timeMachine);
@ -891,16 +890,10 @@ public class TestHCM {
assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(location, ANY_PAUSE));
// All of this shouldn't affect backoff for different location.
assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
tracker.reportServerError(diffLocation);
assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
// But should still work for a different region in the same location.
HRegionInfo ri2 = new HRegionInfo(TABLE_NAME2);
HRegionLocation diffRegion = new HRegionLocation(ri2, location.getServerName());
assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(diffRegion, ANY_PAUSE));
// Check with different base.
assertEqualsWithJitter(ANY_PAUSE * 10,
tracker.calculateBackoffTime(location, ANY_PAUSE * 2));