HBASE-14703 HTable.mutateRow does not collect stats (Heng Chen)
This commit is contained in:
parent
d083e4f29f
commit
ef712df944
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
|
|||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.htrace.Trace;
|
||||
|
@ -428,7 +429,7 @@ class AsyncProcess {
|
|||
if (retainedActions.isEmpty()) return NO_REQS_RESULT;
|
||||
|
||||
return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
|
||||
locationErrors, locationErrorRows, actionsByServer, pool);
|
||||
locationErrors, locationErrorRows, actionsByServer, pool);
|
||||
}
|
||||
|
||||
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
|
||||
|
@ -444,7 +445,7 @@ class AsyncProcess {
|
|||
int originalIndex = locationErrorRows.get(i);
|
||||
Row row = retainedActions.get(originalIndex).getAction();
|
||||
ars.manageError(originalIndex, row,
|
||||
Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
|
||||
Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
|
||||
}
|
||||
}
|
||||
ars.sendMultiAction(actionsByServer, 1, null, false);
|
||||
|
@ -546,9 +547,13 @@ class AsyncProcess {
|
|||
*/
|
||||
public <CResult> AsyncRequestFuture submitAll(TableName tableName,
|
||||
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
|
||||
return submitAll(null, tableName, rows, callback, results);
|
||||
return submitAll(null, tableName, rows, callback, results, null, timeout);
|
||||
}
|
||||
|
||||
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
|
||||
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
|
||||
return submitAll(pool, tableName, rows, callback, results, null, timeout);
|
||||
}
|
||||
/**
|
||||
* Submit immediately the list of rows, whatever the server status. Kept for backward
|
||||
* compatibility: it allows to be used with the batch interface that return an array of objects.
|
||||
|
@ -560,7 +565,8 @@ class AsyncProcess {
|
|||
* @param results Optional array to return the results thru; backward compat.
|
||||
*/
|
||||
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
|
||||
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
|
||||
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
|
||||
PayloadCarryingServerCallable callable, int curTimeout) {
|
||||
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
|
||||
|
||||
// The position will be used by the processBatch to match the object array returned.
|
||||
|
@ -579,7 +585,8 @@ class AsyncProcess {
|
|||
actions.add(action);
|
||||
}
|
||||
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
|
||||
tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
|
||||
tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
|
||||
callable, curTimeout);
|
||||
ars.groupAndSendMultiAction(actions, 1);
|
||||
return ars;
|
||||
}
|
||||
|
@ -711,11 +718,11 @@ class AsyncProcess {
|
|||
private final MultiAction<Row> multiAction;
|
||||
private final int numAttempt;
|
||||
private final ServerName server;
|
||||
private final Set<MultiServerCallable<Row>> callsInProgress;
|
||||
private final Set<PayloadCarryingServerCallable> callsInProgress;
|
||||
|
||||
private SingleServerRequestRunnable(
|
||||
MultiAction<Row> multiAction, int numAttempt, ServerName server,
|
||||
Set<MultiServerCallable<Row>> callsInProgress) {
|
||||
Set<PayloadCarryingServerCallable> callsInProgress) {
|
||||
this.multiAction = multiAction;
|
||||
this.numAttempt = numAttempt;
|
||||
this.server = server;
|
||||
|
@ -725,19 +732,22 @@ class AsyncProcess {
|
|||
@Override
|
||||
public void run() {
|
||||
MultiResponse res;
|
||||
MultiServerCallable<Row> callable = null;
|
||||
PayloadCarryingServerCallable callable = currentCallable;
|
||||
try {
|
||||
callable = createCallable(server, tableName, multiAction);
|
||||
// setup the callable based on the actions, if we don't have one already from the request
|
||||
if (callable == null) {
|
||||
callable = createCallable(server, tableName, multiAction);
|
||||
}
|
||||
RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
|
||||
try {
|
||||
RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
|
||||
if (callsInProgress != null) callsInProgress.add(callable);
|
||||
res = caller.callWithoutRetries(callable, timeout);
|
||||
|
||||
if (callsInProgress != null) {
|
||||
callsInProgress.add(callable);
|
||||
}
|
||||
res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
|
||||
if (res == null) {
|
||||
// Cancelled
|
||||
return;
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
// The service itself failed . It may be an error coming from the communication
|
||||
// layer, but, as well, a functional error raised by the server.
|
||||
|
@ -771,7 +781,7 @@ class AsyncProcess {
|
|||
private final BatchErrors errors;
|
||||
private final ConnectionImplementation.ServerErrorTracker errorsByServer;
|
||||
private final ExecutorService pool;
|
||||
private final Set<MultiServerCallable<Row>> callsInProgress;
|
||||
private final Set<PayloadCarryingServerCallable> callsInProgress;
|
||||
|
||||
|
||||
private final TableName tableName;
|
||||
|
@ -798,10 +808,12 @@ class AsyncProcess {
|
|||
private final int[] replicaGetIndices;
|
||||
private final boolean hasAnyReplicaGets;
|
||||
private final long nonceGroup;
|
||||
private PayloadCarryingServerCallable currentCallable;
|
||||
private int currentCallTotalTimeout;
|
||||
|
||||
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
|
||||
ExecutorService pool, boolean needResults, Object[] results,
|
||||
Batch.Callback<CResult> callback) {
|
||||
Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable, int timeout) {
|
||||
this.pool = pool;
|
||||
this.callback = callback;
|
||||
this.nonceGroup = nonceGroup;
|
||||
|
@ -865,13 +877,16 @@ class AsyncProcess {
|
|||
this.replicaGetIndices = null;
|
||||
}
|
||||
this.callsInProgress = !hasAnyReplicaGets ? null :
|
||||
Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
|
||||
Collections.newSetFromMap(
|
||||
new ConcurrentHashMap<PayloadCarryingServerCallable, Boolean>());
|
||||
|
||||
this.errorsByServer = createServerErrorTracker();
|
||||
this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
|
||||
this.currentCallable = callable;
|
||||
this.currentCallTotalTimeout = timeout;
|
||||
}
|
||||
|
||||
public Set<MultiServerCallable<Row>> getCallsInProgress() {
|
||||
public Set<PayloadCarryingServerCallable> getCallsInProgress() {
|
||||
return callsInProgress;
|
||||
}
|
||||
|
||||
|
@ -1275,11 +1290,15 @@ class AsyncProcess {
|
|||
int failureCount = 0;
|
||||
boolean canRetry = true;
|
||||
|
||||
// Go by original action.
|
||||
Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
|
||||
updateStats(server, results);
|
||||
|
||||
int failed = 0, stopped = 0;
|
||||
// Go by original action.
|
||||
for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
|
||||
byte[] regionName = regionEntry.getKey();
|
||||
Map<Integer, Object> regionResults = responses.getResults().get(regionName);
|
||||
Map<Integer, Object> regionResults = results.get(regionName) == null
|
||||
? null : results.get(regionName).result;
|
||||
if (regionResults == null) {
|
||||
if (!responses.getExceptions().containsKey(regionName)) {
|
||||
LOG.error("Server sent us neither results nor exceptions for "
|
||||
|
@ -1308,7 +1327,7 @@ class AsyncProcess {
|
|||
}
|
||||
++failureCount;
|
||||
Retry retry = manageError(sentAction.getOriginalIndex(), row,
|
||||
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
|
||||
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server);
|
||||
if (retry == Retry.YES) {
|
||||
toReplay.add(sentAction);
|
||||
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
|
||||
|
@ -1317,24 +1336,11 @@ class AsyncProcess {
|
|||
++failed;
|
||||
}
|
||||
} else {
|
||||
|
||||
if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
|
||||
AsyncProcess.this.connection.getConnectionMetrics().
|
||||
updateServerStats(server, regionName, result);
|
||||
}
|
||||
|
||||
// update the stats about the region, if its a user table. We don't want to slow down
|
||||
// updates to meta tables, especially from internal updates (master, etc).
|
||||
if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
|
||||
result = ResultStatsUtil.updateStats(result,
|
||||
AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
|
||||
}
|
||||
|
||||
if (callback != null) {
|
||||
try {
|
||||
//noinspection unchecked
|
||||
// TODO: would callback expect a replica region name if it gets one?
|
||||
this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
|
||||
this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result);
|
||||
} catch (Throwable t) {
|
||||
LOG.error("User callback threw an exception for "
|
||||
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
|
||||
|
@ -1384,7 +1390,6 @@ class AsyncProcess {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (toReplay.isEmpty()) {
|
||||
logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
|
||||
} else {
|
||||
|
@ -1438,8 +1443,8 @@ class AsyncProcess {
|
|||
boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
|
||||
int index = action.getOriginalIndex();
|
||||
if (results == null) {
|
||||
decActionCounter(index);
|
||||
return; // Simple case, no replica requests.
|
||||
decActionCounter(index);
|
||||
return; // Simple case, no replica requests.
|
||||
}
|
||||
state = trySetResultSimple(index, action.getAction(), false, result, null, isStale);
|
||||
if (state == null) {
|
||||
|
@ -1618,7 +1623,7 @@ class AsyncProcess {
|
|||
throw new InterruptedIOException(iex.getMessage());
|
||||
} finally {
|
||||
if (callsInProgress != null) {
|
||||
for (MultiServerCallable<Row> clb : callsInProgress) {
|
||||
for (PayloadCarryingServerCallable clb : callsInProgress) {
|
||||
clb.cancel();
|
||||
}
|
||||
}
|
||||
|
@ -1675,13 +1680,38 @@ class AsyncProcess {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
|
||||
boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null;
|
||||
boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null;
|
||||
if (!stats && !metrics) {
|
||||
return;
|
||||
}
|
||||
for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) {
|
||||
byte[] regionName = regionStats.getKey();
|
||||
ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
|
||||
ResultStatsUtil.updateStats(AsyncProcess.this.connection.getStatisticsTracker(), server,
|
||||
regionName, stat);
|
||||
ResultStatsUtil.updateStats(AsyncProcess.this.connection.getConnectionMetrics(),
|
||||
server, regionName, stat);
|
||||
}
|
||||
}
|
||||
|
||||
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
|
||||
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
|
||||
Batch.Callback<CResult> callback, Object[] results, boolean needResults,
|
||||
PayloadCarryingServerCallable callable, int curTimeout) {
|
||||
return new AsyncRequestFutureImpl<CResult>(
|
||||
tableName, actions, nonceGroup, getPool(pool), needResults,
|
||||
results, callback, callable, curTimeout);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
/** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
|
||||
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
|
||||
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
|
||||
Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
|
||||
return new AsyncRequestFutureImpl<CResult>(
|
||||
tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
|
||||
return createAsyncRequestFuture(
|
||||
tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1697,7 +1727,7 @@ class AsyncProcess {
|
|||
* Create a caller. Isolated to be easily overridden in the tests.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
|
||||
protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable) {
|
||||
return rpcCallerFactory.<MultiResponse> newCaller();
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
|
|||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
||||
|
@ -263,7 +265,8 @@ public class HTable implements HTableInterface {
|
|||
*/
|
||||
@Override
|
||||
public HTableDescriptor getTableDescriptor() throws IOException {
|
||||
HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, operationTimeout);
|
||||
HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection,
|
||||
rpcCallerFactory, operationTimeout);
|
||||
if (htd != null) {
|
||||
return new UnmodifyableHTableDescriptor(htd);
|
||||
}
|
||||
|
@ -450,10 +453,10 @@ public class HTable implements HTableInterface {
|
|||
|
||||
// Call that takes into account the replica
|
||||
RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
|
||||
rpcControllerFactory, tableName, this.connection, get, pool,
|
||||
tableConfiguration.getRetriesNumber(),
|
||||
operationTimeout,
|
||||
tableConfiguration.getPrimaryCallTimeoutMicroSecond());
|
||||
rpcControllerFactory, tableName, this.connection, get, pool,
|
||||
tableConfiguration.getRetriesNumber(),
|
||||
operationTimeout,
|
||||
tableConfiguration.getPrimaryCallTimeoutMicroSecond());
|
||||
return callable.call();
|
||||
}
|
||||
|
||||
|
@ -587,35 +590,47 @@ public class HTable implements HTableInterface {
|
|||
*/
|
||||
@Override
|
||||
public void mutateRow(final RowMutations rm) throws IOException {
|
||||
RegionServerCallable<Void> callable =
|
||||
new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
|
||||
@Override
|
||||
public Void call(int callTimeout) throws IOException {
|
||||
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(tableName);
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
|
||||
getLocation().getRegionInfo().getRegionName(), rm);
|
||||
regionMutationBuilder.setAtomic(true);
|
||||
MultiRequest request =
|
||||
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
|
||||
ClientProtos.MultiResponse response = getStub().multi(controller, request);
|
||||
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
|
||||
if (res.hasException()) {
|
||||
Throwable ex = ProtobufUtil.toException(res.getException());
|
||||
if(ex instanceof IOException) {
|
||||
throw (IOException)ex;
|
||||
}
|
||||
throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
|
||||
final RetryingTimeTracker tracker = new RetryingTimeTracker();
|
||||
PayloadCarryingServerCallable<MultiResponse> callable =
|
||||
new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
|
||||
rpcControllerFactory) {
|
||||
@Override
|
||||
public MultiResponse call(int callTimeout) throws IOException {
|
||||
tracker.start();
|
||||
controller.setPriority(tableName);
|
||||
int remainingTime = tracker.getRemainingTime(callTimeout);
|
||||
if (remainingTime == 0) {
|
||||
throw new DoNotRetryIOException("Timeout for mutate row");
|
||||
}
|
||||
controller.setCallTimeout(remainingTime);
|
||||
try {
|
||||
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
|
||||
getLocation().getRegionInfo().getRegionName(), rm);
|
||||
regionMutationBuilder.setAtomic(true);
|
||||
MultiRequest request =
|
||||
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
|
||||
ClientProtos.MultiResponse response = getStub().multi(controller, request);
|
||||
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
|
||||
if (res.hasException()) {
|
||||
Throwable ex = ProtobufUtil.toException(res.getException());
|
||||
if (ex instanceof IOException) {
|
||||
throw (IOException) ex;
|
||||
}
|
||||
throw new IOException("Failed to mutate row: " +
|
||||
Bytes.toStringBinary(rm.getRow()), ex);
|
||||
}
|
||||
return ResponseConverter.getResults(request, response, controller.cellScanner());
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
|
||||
};
|
||||
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
|
||||
null, null, callable, operationTimeout);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -860,37 +875,55 @@ public class HTable implements HTableInterface {
|
|||
*/
|
||||
@Override
|
||||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOp compareOp, final byte [] value, final RowMutations rm)
|
||||
throws IOException {
|
||||
RegionServerCallable<Boolean> callable =
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
||||
@Override
|
||||
public Boolean call(int callTimeout) throws IOException {
|
||||
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(tableName);
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
CompareType compareType = CompareType.valueOf(compareOp.name());
|
||||
MultiRequest request = RequestConverter.buildMutateRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
||||
new BinaryComparator(value), compareType, rm);
|
||||
ClientProtos.MultiResponse response = getStub().multi(controller, request);
|
||||
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
|
||||
if (res.hasException()) {
|
||||
Throwable ex = ProtobufUtil.toException(res.getException());
|
||||
if(ex instanceof IOException) {
|
||||
throw (IOException)ex;
|
||||
}
|
||||
throw new IOException("Failed to checkAndMutate row: "+
|
||||
Bytes.toStringBinary(rm.getRow()), ex);
|
||||
}
|
||||
return Boolean.valueOf(response.getProcessed());
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
final CompareOp compareOp, final byte [] value, final RowMutations rm)
|
||||
throws IOException {
|
||||
final RetryingTimeTracker tracker = new RetryingTimeTracker();
|
||||
PayloadCarryingServerCallable<MultiResponse> callable =
|
||||
new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
|
||||
rpcControllerFactory) {
|
||||
@Override
|
||||
public MultiResponse call(int callTimeout) throws IOException {
|
||||
tracker.start();
|
||||
controller.setPriority(tableName);
|
||||
int remainingTime = tracker.getRemainingTime(callTimeout);
|
||||
if (remainingTime == 0) {
|
||||
throw new DoNotRetryIOException("Timeout for mutate row");
|
||||
}
|
||||
};
|
||||
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
|
||||
controller.setCallTimeout(remainingTime);
|
||||
try {
|
||||
CompareType compareType = CompareType.valueOf(compareOp.name());
|
||||
MultiRequest request = RequestConverter.buildMutateRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
||||
new BinaryComparator(value), compareType, rm);
|
||||
ClientProtos.MultiResponse response = getStub().multi(controller, request);
|
||||
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
|
||||
if (res.hasException()) {
|
||||
Throwable ex = ProtobufUtil.toException(res.getException());
|
||||
if(ex instanceof IOException) {
|
||||
throw (IOException)ex;
|
||||
}
|
||||
throw new IOException("Failed to checkAndMutate row: "+
|
||||
Bytes.toStringBinary(rm.getRow()), ex);
|
||||
}
|
||||
return ResponseConverter.getResults(request, response, controller.cellScanner());
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
};
|
||||
/**
|
||||
* Currently, we use one array to store 'processed' flag which is returned by server.
|
||||
* It is excessive to send such a large array, but that is required by the framework right now
|
||||
* */
|
||||
Object[] results = new Object[rm.getMutations().size()];
|
||||
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
|
||||
null, results, callable, operationTimeout);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
}
|
||||
|
||||
return ((Result)results[0]).getExists();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -52,7 +52,7 @@ import static com.codahale.metrics.MetricRegistry.name;
|
|||
* {@link #shutdown()} to terminate the thread pools they allocate.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsConnection {
|
||||
public class MetricsConnection implements StatisticTrackable {
|
||||
|
||||
/** Set this key to {@code true} to enable metrics collection of client requests. */
|
||||
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
|
||||
|
@ -199,9 +199,15 @@ public class MetricsConnection {
|
|||
}
|
||||
Result result = (Result) r;
|
||||
ClientProtos.RegionLoadStats stats = result.getStats();
|
||||
if(stats == null){
|
||||
if (stats == null) {
|
||||
return;
|
||||
}
|
||||
updateRegionStats(serverName, regionName, stats);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRegionStats(ServerName serverName, byte[] regionName,
|
||||
ClientProtos.RegionLoadStats stats) {
|
||||
String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
|
||||
ConcurrentMap<byte[], RegionStats> rsStats = null;
|
||||
if (serverStats.containsKey(serverName)) {
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Map;
|
|||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
|
@ -33,8 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
public class MultiResponse {
|
||||
|
||||
// map of regionName to map of Results by the original index for that Result
|
||||
private Map<byte[], Map<Integer, Object>> results =
|
||||
new TreeMap<byte[], Map<Integer, Object>>(Bytes.BYTES_COMPARATOR);
|
||||
private Map<byte[], RegionResult> results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
/**
|
||||
* The server can send us a failure for the region itself, instead of individual failure.
|
||||
|
@ -52,8 +52,8 @@ public class MultiResponse {
|
|||
*/
|
||||
public int size() {
|
||||
int size = 0;
|
||||
for (Map<?,?> c : results.values()) {
|
||||
size += c.size();
|
||||
for (RegionResult result: results.values()) {
|
||||
size += result.size();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
@ -66,16 +66,7 @@ public class MultiResponse {
|
|||
* @param resOrEx the result or error; will be empty for successful Put and Delete actions.
|
||||
*/
|
||||
public void add(byte[] regionName, int originalIndex, Object resOrEx) {
|
||||
Map<Integer, Object> rs = results.get(regionName);
|
||||
if (rs == null) {
|
||||
rs = new HashMap<Integer, Object>();
|
||||
results.put(regionName, rs);
|
||||
}
|
||||
rs.put(originalIndex, resOrEx);
|
||||
}
|
||||
|
||||
public Map<byte[], Map<Integer, Object>> getResults() {
|
||||
return results;
|
||||
getResult(regionName).addResult(originalIndex, resOrEx);
|
||||
}
|
||||
|
||||
public void addException(byte []regionName, Throwable ie){
|
||||
|
@ -92,4 +83,42 @@ public class MultiResponse {
|
|||
public Map<byte[], Throwable> getExceptions() {
|
||||
return exceptions;
|
||||
}
|
||||
|
||||
public void addStatistic(byte[] regionName, ClientProtos.RegionLoadStats stat) {
|
||||
getResult(regionName).setStat(stat);
|
||||
}
|
||||
|
||||
private RegionResult getResult(byte[] region){
|
||||
RegionResult rs = results.get(region);
|
||||
if (rs == null) {
|
||||
rs = new RegionResult();
|
||||
results.put(region, rs);
|
||||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
public Map<byte[], RegionResult> getResults(){
|
||||
return this.results;
|
||||
}
|
||||
|
||||
static class RegionResult{
|
||||
Map<Integer, Object> result = new HashMap<>();
|
||||
ClientProtos.RegionLoadStats stat;
|
||||
|
||||
public void addResult(int index, Object result){
|
||||
this.result.put(index, result);
|
||||
}
|
||||
|
||||
public void setStat(ClientProtos.RegionLoadStats stat){
|
||||
this.stat = stat;
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return this.result.size();
|
||||
}
|
||||
|
||||
public ClientProtos.RegionLoadStats getStat() {
|
||||
return this.stat;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
|
@ -50,21 +49,19 @@ import com.google.protobuf.ServiceException;
|
|||
* {@link RegionServerCallable} that goes against multiple regions.
|
||||
* @param <R>
|
||||
*/
|
||||
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> implements Cancellable {
|
||||
class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> {
|
||||
private final MultiAction<R> multiAction;
|
||||
private final boolean cellBlock;
|
||||
private final PayloadCarryingRpcController controller;
|
||||
|
||||
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
|
||||
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
|
||||
super(connection, tableName, null);
|
||||
super(connection, tableName, null, rpcFactory);
|
||||
this.multiAction = multi;
|
||||
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
|
||||
// Using region info from parent HRegionLocation would be a mistake for this class; so
|
||||
// we will store the server here, and throw if someone tries to obtain location/regioninfo.
|
||||
this.location = new HRegionLocation(null, location);
|
||||
this.cellBlock = isCellBlock();
|
||||
controller = rpcFactory.newController();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -133,16 +130,6 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> impleme
|
|||
return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
controller.startCancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return controller.isCanceled();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if we should send data in cellblocks. This is an expensive call. Cache the
|
||||
* result if you can rather than call each time.
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
|
||||
/**
|
||||
* This class is used to unify HTable calls with AsyncProcess Framework.
|
||||
* HTable can use AsyncProcess directly though this class.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class PayloadCarryingServerCallable<T>
|
||||
extends RegionServerCallable<T> implements Cancellable {
|
||||
protected PayloadCarryingRpcController controller;
|
||||
|
||||
public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
|
||||
RpcControllerFactory rpcControllerFactory) {
|
||||
super(connection, tableName, row);
|
||||
this.controller = rpcControllerFactory.newController();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
controller.startCancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return controller.isCanceled();
|
||||
}
|
||||
}
|
|
@ -55,13 +55,17 @@ public final class ResultStatsUtil {
|
|||
return r;
|
||||
}
|
||||
|
||||
if (regionName != null) {
|
||||
serverStats.updateRegionStats(server, regionName, stats);
|
||||
}
|
||||
|
||||
updateStats(serverStats, server, regionName, stats);
|
||||
return r;
|
||||
}
|
||||
|
||||
public static void updateStats(StatisticTrackable tracker, ServerName server, byte[] regionName,
|
||||
ClientProtos.RegionLoadStats stats) {
|
||||
if (regionName != null && stats != null && tracker != null) {
|
||||
tracker.updateRegionStats(server, regionName, stats);
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> T updateStats(T r, ServerStatisticTracker stats,
|
||||
HRegionLocation regionLocation) {
|
||||
byte[] regionName = null;
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* Tracks the amount of time remaining for an operation.
|
||||
*/
|
||||
class RetryingTimeTracker {
|
||||
|
||||
private long globalStartTime = -1;
|
||||
|
||||
public void start() {
|
||||
if (this.globalStartTime < 0) {
|
||||
this.globalStartTime = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
}
|
||||
|
||||
public int getRemainingTime(int callTimeout) {
|
||||
if (callTimeout <= 0) {
|
||||
return 0;
|
||||
} else {
|
||||
if (callTimeout == Integer.MAX_VALUE) {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
int remainingTime = (int) (
|
||||
callTimeout -
|
||||
(EnvironmentEdgeManager.currentTime() - this.globalStartTime));
|
||||
if (remainingTime < 1) {
|
||||
// If there is no time left, we're trying anyway. It's too late.
|
||||
// 0 means no timeout, and it's not the intent here. So we secure both cases by
|
||||
// resetting to the minimum.
|
||||
remainingTime = 1;
|
||||
}
|
||||
return remainingTime;
|
||||
}
|
||||
}
|
||||
|
||||
public long getStartTime() {
|
||||
return this.globalStartTime;
|
||||
}
|
||||
}
|
|
@ -67,12 +67,6 @@ public class RpcRetryingCallerFactory {
|
|||
// is cheap as it does not require parsing a complex structure.
|
||||
RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,
|
||||
startLogErrorsCnt);
|
||||
|
||||
// wrap it with stats, if we are tracking them
|
||||
if (enableBackPressure && this.stats != null) {
|
||||
caller = new StatsTrackingRpcRetryingCaller<T>(caller, this.stats);
|
||||
}
|
||||
|
||||
return caller;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,10 +51,6 @@ import com.google.protobuf.ServiceException;
|
|||
public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
||||
// LOG is being used in TestMultiRowRangeFilter, hence leaving it public
|
||||
public static final Log LOG = LogFactory.getLog(RpcRetryingCallerImpl.class);
|
||||
/**
|
||||
* When we started making calls.
|
||||
*/
|
||||
private long globalStartTime;
|
||||
|
||||
/** How many retries are allowed before we start to log */
|
||||
private final int startLogErrorsCnt;
|
||||
|
@ -64,6 +60,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
|||
private final AtomicBoolean cancelled = new AtomicBoolean(false);
|
||||
private final RetryingCallerInterceptor interceptor;
|
||||
private final RetryingCallerInterceptorContext context;
|
||||
private final RetryingTimeTracker tracker;
|
||||
|
||||
public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) {
|
||||
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
|
||||
|
@ -76,23 +73,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
|||
this.interceptor = interceptor;
|
||||
context = interceptor.createEmptyContext();
|
||||
this.startLogErrorsCnt = startLogErrorsCnt;
|
||||
}
|
||||
|
||||
private int getRemainingTime(int callTimeout) {
|
||||
if (callTimeout <= 0) {
|
||||
return 0;
|
||||
} else {
|
||||
if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
|
||||
int remainingTime = (int) (callTimeout -
|
||||
(EnvironmentEdgeManager.currentTime() - this.globalStartTime));
|
||||
if (remainingTime < 1) {
|
||||
// If there is no time left, we're trying anyway. It's too late.
|
||||
// 0 means no timeout, and it's not the intent here. So we secure both cases by
|
||||
// resetting to the minimum.
|
||||
remainingTime = 1;
|
||||
}
|
||||
return remainingTime;
|
||||
}
|
||||
this.tracker = new RetryingTimeTracker();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -108,21 +89,21 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
|||
throws IOException, RuntimeException {
|
||||
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
|
||||
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
|
||||
this.globalStartTime = EnvironmentEdgeManager.currentTime();
|
||||
tracker.start();
|
||||
context.clear();
|
||||
for (int tries = 0;; tries++) {
|
||||
long expectedSleep;
|
||||
try {
|
||||
callable.prepare(tries != 0); // if called with false, check table status on ZK
|
||||
interceptor.intercept(context.prepare(callable, tries));
|
||||
return callable.call(getRemainingTime(callTimeout));
|
||||
return callable.call(tracker.getRemainingTime(callTimeout));
|
||||
} catch (PreemptiveFastFailException e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
ExceptionUtil.rethrowIfInterrupt(t);
|
||||
if (tries > startLogErrorsCnt) {
|
||||
LOG.info("Call exception, tries=" + tries + ", maxAttempts=" + maxAttempts + ", started="
|
||||
+ (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
|
||||
+ (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + " ms ago, "
|
||||
+ "cancelled=" + cancelled.get() + ", msg="
|
||||
+ callable.getExceptionMessageAdditionalDetail());
|
||||
}
|
||||
|
@ -172,14 +153,13 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
|||
* @return Calculate how long a single call took
|
||||
*/
|
||||
private long singleCallDuration(final long expectedSleep) {
|
||||
return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
|
||||
return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
// The code of this method should be shared with withRetries.
|
||||
this.globalStartTime = EnvironmentEdgeManager.currentTime();
|
||||
try {
|
||||
callable.prepare(false);
|
||||
return callable.call(callTimeout);
|
||||
|
@ -231,7 +211,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
|
||||
return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() +
|
||||
", pause=" + pause + ", maxAttempts=" + maxAttempts + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,11 +31,12 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
* Tracks the statistics for multiple regions
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ServerStatisticTracker {
|
||||
public class ServerStatisticTracker implements StatisticTrackable {
|
||||
|
||||
private final ConcurrentHashMap<ServerName, ServerStatistics> stats =
|
||||
new ConcurrentHashMap<ServerName, ServerStatistics>();
|
||||
|
||||
@Override
|
||||
public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats
|
||||
currentStats) {
|
||||
ServerStatistics stat = stats.get(server);
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
|
||||
/**
|
||||
* Parent interface for an object to get updates about per-region statistics.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface StatisticTrackable {
|
||||
/**
|
||||
* Update stats per region.
|
||||
* */
|
||||
void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats
|
||||
stats);
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An {@link RpcRetryingCaller} that will update the per-region stats for the call on return,
|
||||
* if stats are available
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class StatsTrackingRpcRetryingCaller<T> implements RpcRetryingCaller<T> {
|
||||
private final ServerStatisticTracker stats;
|
||||
private final RpcRetryingCaller<T> delegate;
|
||||
|
||||
public StatsTrackingRpcRetryingCaller(RpcRetryingCaller<T> delegate,
|
||||
ServerStatisticTracker stats) {
|
||||
this.delegate = delegate;
|
||||
this.stats = stats;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
delegate.cancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
T result = delegate.callWithRetries(callable, callTimeout);
|
||||
return updateStatsAndUnwrap(result, callable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
T result = delegate.callWithRetries(callable, callTimeout);
|
||||
return updateStatsAndUnwrap(result, callable);
|
||||
}
|
||||
|
||||
private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) {
|
||||
// don't track stats about requests that aren't to regionservers
|
||||
if (!(callable instanceof RegionServerCallable)) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// mutli-server callables span multiple regions, so they don't have a location,
|
||||
// but they are region server callables, so we have to handle them when we process the
|
||||
// result in AsyncProcess#receiveMultiAction, not in here
|
||||
if (callable instanceof MultiServerCallable) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// update the stats for the single server callable
|
||||
RegionServerCallable<T> regionCallable = (RegionServerCallable) callable;
|
||||
HRegionLocation location = regionCallable.getLocation();
|
||||
return ResultStatsUtil.updateStats(result, stats, location);
|
||||
}
|
||||
}
|
|
@ -193,8 +193,8 @@ public final class ProtobufUtil {
|
|||
*/
|
||||
private final static Cell[] EMPTY_CELL_ARRAY = new Cell[]{};
|
||||
private final static Result EMPTY_RESULT = Result.create(EMPTY_CELL_ARRAY);
|
||||
private final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
|
||||
private final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null, false);
|
||||
final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
|
||||
final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null, false);
|
||||
private final static Result EMPTY_RESULT_STALE = Result.create(EMPTY_CELL_ARRAY, null, true);
|
||||
private final static Result EMPTY_RESULT_EXISTS_TRUE_STALE
|
||||
= Result.create((Cell[])null, true, true);
|
||||
|
|
|
@ -89,7 +89,7 @@ public final class ResponseConverter {
|
|||
int requestRegionActionCount = request.getRegionActionCount();
|
||||
int responseRegionActionResultCount = response.getRegionActionResultCount();
|
||||
if (requestRegionActionCount != responseRegionActionResultCount) {
|
||||
throw new IllegalStateException("Request mutation count=" + responseRegionActionResultCount +
|
||||
throw new IllegalStateException("Request mutation count=" + requestRegionActionCount +
|
||||
" does not match response mutation result count=" + responseRegionActionResultCount);
|
||||
}
|
||||
|
||||
|
@ -125,21 +125,27 @@ public final class ResponseConverter {
|
|||
responseValue = ProtobufUtil.toException(roe.getException());
|
||||
} else if (roe.hasResult()) {
|
||||
responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
|
||||
// add the load stats, if we got any
|
||||
if (roe.hasLoadStats()) {
|
||||
((Result) responseValue).addResults(roe.getLoadStats());
|
||||
}
|
||||
} else if (roe.hasServiceResult()) {
|
||||
responseValue = roe.getServiceResult();
|
||||
} else {
|
||||
// no result & no exception. Unexpected.
|
||||
throw new IllegalStateException("No result & no exception roe=" + roe +
|
||||
" for region " + actions.getRegion());
|
||||
} else{
|
||||
// Sometimes, the response is just "it was processed". Generally, this occurs for things
|
||||
// like mutateRows where either we get back 'processed' (or not) and optionally some
|
||||
// statistics about the regions we touched.
|
||||
responseValue = response.getProcessed() ?
|
||||
ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
|
||||
ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
|
||||
}
|
||||
results.add(regionName, roe.getIndex(), responseValue);
|
||||
}
|
||||
}
|
||||
|
||||
if (response.hasRegionStatistics()) {
|
||||
ClientProtos.MultiRegionLoadStats stats = response.getRegionStatistics();
|
||||
for (int i = 0; i < stats.getRegionCount(); i++) {
|
||||
results.addStatistic(stats.getRegion(i).getValue().toByteArray(), stats.getStat(i));
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
|
@ -161,11 +167,9 @@ public final class ResponseConverter {
|
|||
* @param r
|
||||
* @return an action result builder
|
||||
*/
|
||||
public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r,
|
||||
ClientProtos.RegionLoadStats stats) {
|
||||
public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) {
|
||||
ResultOrException.Builder builder = ResultOrException.newBuilder();
|
||||
if (r != null) builder.setResult(r);
|
||||
if(stats != null) builder.setLoadStats(stats);
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
|
|
@ -185,10 +185,12 @@ public class TestAsyncProcess {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
|
||||
protected RpcRetryingCaller<MultiResponse> createCaller(
|
||||
PayloadCarryingServerCallable callable) {
|
||||
callsCt.incrementAndGet();
|
||||
MultiServerCallable callable1 = (MultiServerCallable) callable;
|
||||
final MultiResponse mr = createMultiResponse(
|
||||
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
|
||||
callable1.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
|
||||
@Override
|
||||
public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
|
||||
if (Arrays.equals(FAILS, a.getAction().getRow())) {
|
||||
|
@ -227,7 +229,8 @@ public class TestAsyncProcess {
|
|||
}
|
||||
|
||||
@Override
|
||||
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
|
||||
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
|
||||
int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
throw e;
|
||||
}
|
||||
|
@ -245,7 +248,8 @@ public class TestAsyncProcess {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
|
||||
protected RpcRetryingCaller<MultiResponse> createCaller(
|
||||
PayloadCarryingServerCallable callable) {
|
||||
callsCt.incrementAndGet();
|
||||
return new CallerWithFailure(ioe);
|
||||
}
|
||||
|
@ -282,7 +286,8 @@ public class TestAsyncProcess {
|
|||
|
||||
@Override
|
||||
protected RpcRetryingCaller<MultiResponse> createCaller(
|
||||
MultiServerCallable<Row> callable) {
|
||||
PayloadCarryingServerCallable payloadCallable) {
|
||||
MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
|
||||
final MultiResponse mr = createMultiResponse(
|
||||
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
|
||||
@Override
|
||||
|
@ -312,7 +317,8 @@ public class TestAsyncProcess {
|
|||
|
||||
return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
|
||||
@Override
|
||||
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
|
||||
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
|
||||
int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
long sleep = -1;
|
||||
if (isDefault) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -402,6 +402,11 @@ message RegionLoadStats {
|
|||
optional int32 compactionPressure = 3 [default = 0];
|
||||
}
|
||||
|
||||
message MultiRegionLoadStats{
|
||||
repeated RegionSpecifier region = 1;
|
||||
repeated RegionLoadStats stat = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Either a Result or an Exception NameBytesPair (keyed by
|
||||
* exception name whose value is the exception stringified)
|
||||
|
@ -416,7 +421,7 @@ message ResultOrException {
|
|||
// result if this was a coprocessor service call
|
||||
optional CoprocessorServiceResult service_result = 4;
|
||||
// current load on the region
|
||||
optional RegionLoadStats loadStats = 5;
|
||||
optional RegionLoadStats loadStats = 5 [deprecated=true];
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -445,6 +450,7 @@ message MultiResponse {
|
|||
repeated RegionActionResult regionActionResult = 1;
|
||||
// used for mutate to indicate processed only
|
||||
optional bool processed = 2;
|
||||
optional MultiRegionLoadStats regionStatistics = 3;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -6768,9 +6768,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
/**
|
||||
* @return the current load statistics for the the region
|
||||
* @return statistics about the current load of the region
|
||||
*/
|
||||
public ClientProtos.RegionLoadStats getRegionStats() {
|
||||
public ClientProtos.RegionLoadStats getLoadStatistics() {
|
||||
if (!regionStatsEnabled) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -139,6 +139,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRegionLoadStats;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
||||
|
@ -397,9 +398,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
}
|
||||
|
||||
private static ResultOrException getResultOrException(
|
||||
final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
|
||||
return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
|
||||
private static ResultOrException getResultOrException(final ClientProtos.Result r,
|
||||
final int index){
|
||||
return getResultOrException(ResponseConverter.buildActionResult(r), index);
|
||||
}
|
||||
|
||||
private static ResultOrException getResultOrException(final Exception e, final int index) {
|
||||
|
@ -501,13 +502,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
* @param cellScanner if non-null, the mutation data -- the Cell content.
|
||||
* @throws IOException
|
||||
*/
|
||||
private ClientProtos.RegionLoadStats mutateRows(final Region region,
|
||||
private void mutateRows(final Region region,
|
||||
final List<ClientProtos.Action> actions,
|
||||
final CellScanner cellScanner) throws IOException {
|
||||
final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException {
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
RowMutations rm = null;
|
||||
int i = 0;
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
ClientProtos.ResultOrException.newBuilder();
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
|
@ -527,9 +531,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
default:
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
|
||||
}
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through client's
|
||||
// AsyncProcess we have to add an empty result instance per operation
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i++);
|
||||
builder.addResultOrException(
|
||||
resultOrExceptionOrBuilder.build());
|
||||
}
|
||||
region.mutateRow(rm);
|
||||
return ((HRegion)region).getRegionStats();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -546,11 +555,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
*/
|
||||
private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
|
||||
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
|
||||
CompareOp compareOp, ByteArrayComparable comparator,
|
||||
RegionActionResult.Builder builder) throws IOException {
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
RowMutations rm = null;
|
||||
int i = 0;
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
ClientProtos.ResultOrException.newBuilder();
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
|
@ -570,8 +583,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
default:
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
|
||||
}
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through client's
|
||||
// AsyncProcess we have to add an empty result instance per operation
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i++);
|
||||
builder.addResultOrException(
|
||||
resultOrExceptionOrBuilder.build());
|
||||
}
|
||||
return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
|
||||
return region.checkAndRowMutate(row, family, qualifier, compareOp,
|
||||
comparator, rm, Boolean.TRUE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -868,8 +888,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
|
||||
case SUCCESS:
|
||||
builder.addResultOrException(getResultOrException(
|
||||
ClientProtos.Result.getDefaultInstance(), index,
|
||||
((HRegion) region).getRegionStats()));
|
||||
ClientProtos.Result.getDefaultInstance(), index));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -2246,13 +2265,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
RegionScannersCloseCallBack closeCallBack = null;
|
||||
RpcCallContext context = RpcServer.getCurrentCall();
|
||||
this.rpcMultiRequestCount.increment();
|
||||
Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
|
||||
.getRegionActionCount());
|
||||
for (RegionAction regionAction : request.getRegionActionList()) {
|
||||
this.requestCount.add(regionAction.getActionCount());
|
||||
OperationQuota quota;
|
||||
Region region;
|
||||
regionActionResultBuilder.clear();
|
||||
RegionSpecifier regionSpecifier = regionAction.getRegion();
|
||||
try {
|
||||
region = getRegion(regionAction.getRegion());
|
||||
region = getRegion(regionSpecifier);
|
||||
quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
|
||||
} catch (IOException e) {
|
||||
rpcServer.getMetrics().exception(e);
|
||||
|
@ -2280,15 +2302,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
ByteArrayComparable comparator =
|
||||
ProtobufUtil.toComparator(condition.getComparator());
|
||||
processed = checkAndRowMutate(region, regionAction.getActionList(),
|
||||
cellScanner, row, family, qualifier, compareOp, comparator);
|
||||
cellScanner, row, family, qualifier, compareOp,
|
||||
comparator, regionActionResultBuilder);
|
||||
} else {
|
||||
ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
|
||||
cellScanner);
|
||||
// add the stats to the request
|
||||
if(stats != null) {
|
||||
responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
|
||||
.addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
|
||||
}
|
||||
mutateRows(region, regionAction.getActionList(), cellScanner,
|
||||
regionActionResultBuilder);
|
||||
processed = Boolean.TRUE;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -2310,14 +2328,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
|
||||
quota.close();
|
||||
ClientProtos.RegionLoadStats regionLoadStats = ((HRegion)region).getLoadStatistics();
|
||||
if(regionLoadStats != null) {
|
||||
regionStats.put(regionSpecifier, regionLoadStats);
|
||||
}
|
||||
}
|
||||
// Load the controller with the Cells to return.
|
||||
if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
|
||||
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
|
||||
}
|
||||
|
||||
if (processed != null) {
|
||||
responseBuilder.setProcessed(processed);
|
||||
}
|
||||
|
||||
MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
|
||||
for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){
|
||||
builder.addRegion(stat.getKey());
|
||||
builder.addStat(stat.getValue());
|
||||
}
|
||||
responseBuilder.setRegionStatistics(builder);
|
||||
return responseBuilder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ public class TestCheckAndMutate {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutate() throws Exception {
|
||||
public void testCheckAndMutate() throws Throwable {
|
||||
final TableName tableName = TableName.valueOf("TestPutWithDelete");
|
||||
final byte[] rowKey = Bytes.toBytes("12345");
|
||||
final byte[] family = Bytes.toBytes("cf");
|
||||
|
@ -109,7 +109,12 @@ public class TestCheckAndMutate {
|
|||
table.checkAndMutate(rowKey, family, Bytes.toBytes("A"), CompareFilter.CompareOp.EQUAL,
|
||||
Bytes.toBytes("a"), rm);
|
||||
fail("Expected NoSuchColumnFamilyException");
|
||||
} catch(NoSuchColumnFamilyException e) {
|
||||
} catch (RetriesExhaustedWithDetailsException e) {
|
||||
try {
|
||||
throw e.getCause(0);
|
||||
} catch (NoSuchColumnFamilyException e1) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
table.close();
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -166,4 +167,32 @@ public class TestClientPushback {
|
|||
assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0);
|
||||
assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMutateRowStats() throws IOException {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
|
||||
HTable table = (HTable) conn.getTable(tableName);
|
||||
HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
|
||||
Region region = rs.getOnlineRegions(tableName).get(0);
|
||||
|
||||
RowMutations mutations = new RowMutations(Bytes.toBytes("row"));
|
||||
Put p = new Put(Bytes.toBytes("row"));
|
||||
p.addColumn(family, qualifier, Bytes.toBytes("value2"));
|
||||
mutations.add(p);
|
||||
table.mutateRow(mutations);
|
||||
|
||||
ServerStatisticTracker stats = conn.getStatisticsTracker();
|
||||
assertNotNull( "No stats configured for the client!", stats);
|
||||
// get the names so we can query the stats
|
||||
ServerName server = rs.getServerName();
|
||||
byte[] regionName = region.getRegionInfo().getRegionName();
|
||||
|
||||
// check to see we found some load on the memstore
|
||||
ServerStatistics serverStats = stats.getServerStatsForTesting(server);
|
||||
ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
|
||||
|
||||
assertNotNull(regionStats);
|
||||
assertTrue(regionStats.getMemstoreLoadPercent() > 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4365,7 +4365,13 @@ public class TestFromClientSide {
|
|||
arm.add(p);
|
||||
t.mutateRow(arm);
|
||||
fail("Expected NoSuchColumnFamilyException");
|
||||
} catch(NoSuchColumnFamilyException e) {
|
||||
} catch(RetriesExhaustedWithDetailsException e) {
|
||||
for(Throwable rootCause: e.getCauses()){
|
||||
if(rootCause instanceof NoSuchColumnFamilyException){
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -573,10 +573,11 @@ public class TestReplicasClient {
|
|||
Assert.assertTrue(((Result)r).isStale());
|
||||
Assert.assertTrue(((Result)r).getExists());
|
||||
}
|
||||
Set<MultiServerCallable<Row>> set = ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
|
||||
Set<PayloadCarryingServerCallable> set =
|
||||
((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
|
||||
// verify we did cancel unneeded calls
|
||||
Assert.assertTrue(!set.isEmpty());
|
||||
for (MultiServerCallable<Row> m : set) {
|
||||
for (PayloadCarryingServerCallable m : set) {
|
||||
Assert.assertTrue(m.isCancelled());
|
||||
}
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue