HBASE-14946 Don't allow multi's to over run the max result size.

Summary:
* Add VersionInfoUtil to determine if a client has a specified version or better
* Add an exception type to say that the response should be chunked
* Add on client knowledge of retry exceptions
* Add on metrics for how often this happens

Test Plan: Added a unit test

Differential Revision: https://reviews.facebook.net/D51771
This commit is contained in:
Elliott Clark 2015-12-07 18:33:35 -08:00
parent c15e0af84a
commit 48e217a7db
14 changed files with 462 additions and 104 deletions

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
/**
* Exception thrown when the result needs to be chunked on the server side.
* It signals that retries should happen right away and not count against the number of
* retries because some of the multi was a success.
*/
public class MultiActionResultTooLarge extends RetryImmediatelyException {
public MultiActionResultTooLarge(String s) {
super(s);
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
public class RetryImmediatelyException extends IOException {
public RetryImmediatelyException(String s) {
super(s);
}
}

View File

@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -126,19 +127,35 @@ class AsyncProcess {
public void waitUntilDone() throws InterruptedIOException;
}
/** Return value from a submit that didn't contain any requests. */
/**
* Return value from a submit that didn't contain any requests.
*/
private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
final Object[] result = new Object[0];
@Override
public boolean hasError() { return false; }
public boolean hasError() {
return false;
}
@Override
public RetriesExhaustedWithDetailsException getErrors() { return null; }
public RetriesExhaustedWithDetailsException getErrors() {
return null;
}
@Override
public List<? extends Row> getFailedOperations() { return null; }
public List<? extends Row> getFailedOperations() {
return null;
}
@Override
public Object[] getResults() { return result; }
public Object[] getResults() {
return result;
}
@Override
public void waitUntilDone() throws InterruptedIOException {}
public void waitUntilDone() throws InterruptedIOException {
}
};
/** Sync point for calls to multiple replicas for the same user request (Get).
@ -308,8 +325,12 @@ class AsyncProcess {
* RuntimeException
*/
private ExecutorService getPool(ExecutorService pool) {
if (pool != null) return pool;
if (this.pool != null) return this.pool;
if (pool != null) {
return pool;
}
if (this.pool != null) {
return this.pool;
}
throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
}
@ -367,7 +388,9 @@ class AsyncProcess {
Row r = it.next();
HRegionLocation loc;
try {
if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
if (r == null) {
throw new IllegalArgumentException("#" + id + ", row cannot be null");
}
// Make sure we get 0-s replica.
RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
@ -730,10 +753,10 @@ class AsyncProcess {
// Normal case: we received an answer from the server, and it's not an exception.
receiveMultiAction(multiAction, server, res, numAttempt);
} catch (Throwable t) {
// Something really bad happened. We are on the send thread that will now die.
LOG.error("Internal AsyncProcess #" + id + " error for "
+ tableName + " processing for " + server, t);
throw new RuntimeException(t);
// Something really bad happened. We are on the send thread that will now die.
LOG.error("Internal AsyncProcess #" + id + " error for "
+ tableName + " processing for " + server, t);
throw new RuntimeException(t);
} finally {
decTaskCounters(multiAction.getRegions(), server);
if (callsInProgress != null && callable != null) {
@ -752,19 +775,25 @@ class AsyncProcess {
private final TableName tableName;
private final AtomicLong actionsInProgress = new AtomicLong(-1);
/** The lock controls access to results. It is only held when populating results where
/**
* The lock controls access to results. It is only held when populating results where
* there might be several callers (eventual consistency gets). For other requests,
* there's one unique call going on per result index. */
* there's one unique call going on per result index.
*/
private final Object replicaResultLock = new Object();
/** Result array. Null if results are not needed. Otherwise, each index corresponds to
/**
* Result array. Null if results are not needed. Otherwise, each index corresponds to
* the action index in initial actions submitted. For most request types, has null-s for
* requests that are not done, and result/exception for those that are done.
* For eventual-consistency gets, initially the same applies; at some point, replica calls
* might be started, and ReplicaResultState is put at the corresponding indices. The
* returning calls check the type to detect when this is the case. After all calls are done,
* ReplicaResultState-s are replaced with results for the user. */
* ReplicaResultState-s are replaced with results for the user.
*/
private final Object[] results;
/** Indices of replica gets in results. If null, all or no actions are replica-gets. */
/**
* Indices of replica gets in results. If null, all or no actions are replica-gets.
*/
private final int[] replicaGetIndices;
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
@ -779,7 +808,9 @@ class AsyncProcess {
this.actionsInProgress.set(actions.size());
if (results != null) {
assert needResults;
if (results.length != actions.size()) throw new AssertionError("results.length");
if (results.length != actions.size()) {
throw new AssertionError("results.length");
}
this.results = results;
for (int i = 0; i != this.results.length; ++i) {
results[i] = null;
@ -1178,9 +1209,13 @@ class AsyncProcess {
// We have two contradicting needs here:
// 1) We want to get the new location after having slept, as it may change.
// 2) We want to take into account the location when calculating the sleep time.
// 3) If all this is just because the response needed to be chunked try again FAST.
// It should be possible to have some heuristics to take the right decision. Short term,
// we go for one.
long backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause);
boolean retryImmediately = throwable instanceof RetryImmediatelyException;
int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
long backOffTime = retryImmediately ? 0 :
errorsByServer.calculateBackoffTime(oldServer, pause);
if (numAttempt > startLogErrorsCnt) {
// We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on
@ -1189,14 +1224,16 @@ class AsyncProcess {
}
try {
Thread.sleep(backOffTime);
if (backOffTime > 0) {
Thread.sleep(backOffTime);
}
} catch (InterruptedException e) {
LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
Thread.currentThread().interrupt();
return;
}
groupAndSendMultiAction(toReplay, numAttempt + 1);
groupAndSendMultiAction(toReplay, nextAttemptNumber);
}
private void logNoResubmit(ServerName oldServer, int numAttempt,
@ -1256,6 +1293,7 @@ class AsyncProcess {
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = sentAction.getAction();
throwable = ConnectionImplementation.findException(result);
// Register corresponding failures once per server/once per region.
if (!regionFailureRegistered) {
regionFailureRegistered = true;
@ -1405,7 +1443,9 @@ class AsyncProcess {
// will either see state with callCount 0 after locking it; or will not see state at all
// we will replace it with the result.
synchronized (state) {
if (state.callCount == 0) return; // someone already set the result
if (state.callCount == 0) {
return; // someone already set the result
}
state.callCount = 0;
}
synchronized (replicaResultLock) {

View File

@ -35,8 +35,10 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
@ -298,7 +300,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
Throwable cur = (Throwable) exception;
while (cur != null) {
if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException) {
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|| cur instanceof RetryImmediatelyException) {
return cur;
}
if (cur instanceof RemoteException) {
@ -1929,7 +1932,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
Throwable cause = findException(exception);
if (cause != null) {
if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException
|| cause instanceof ThrottlingException) {
|| cause instanceof ThrottlingException || cause instanceof MultiActionResultTooLarge) {
// We know that the region is still on this region server
return;
}

View File

@ -839,6 +839,9 @@ public class Result implements CellScannable, CellScanner {
*/
public static long getTotalSizeOfCells(Result result) {
long size = 0;
if (result.isEmpty()) {
return size;
}
for (Cell c : result.rawCells()) {
size += CellUtil.estimatedHeapSizeOf(c);
}

View File

@ -74,6 +74,9 @@ public interface MetricsHBaseServerSource extends BaseSource {
String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
String EXCEPTIONS_MULTI_TOO_LARGE_NAME = "exceptions.multiResponseTooLarge";
String EXCEPTIONS_MULTI_TOO_LARGE_DESC = "A response to a multi request was too large and the " +
"rest of the requests will have to be retried.";
void authorizationSuccess();
@ -96,6 +99,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
void notServingRegionException();
void unknownScannerException();
void tooBusyException();
void multiActionTooLargeException();
void sentBytes(long count);
@ -110,4 +114,6 @@ public interface MetricsHBaseServerSource extends BaseSource {
void processedCall(int processingTime);
void queuedAndProcessedCall(int totalTime);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
implements MetricsHBaseServerSource {
private final MetricsHBaseServerWrapper wrapper;
private final MutableCounterLong authorizationSuccesses;
private final MutableCounterLong authorizationFailures;
@ -47,6 +48,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
private final MutableCounterLong exceptionsSanity;
private final MutableCounterLong exceptionsNSRE;
private final MutableCounterLong exceptionsMoved;
private final MutableCounterLong exceptionsMultiTooLarge;
private MutableHistogram queueCallTime;
@ -81,6 +83,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.newCounter(EXCEPTIONS_MOVED_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsNSRE = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_NSRE_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsMultiTooLarge = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_MULTI_TOO_LARGE_NAME, EXCEPTIONS_MULTI_TOO_LARGE_DESC, 0L);
this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0L);
@ -159,6 +163,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
exceptionsBusy.incr();
}
@Override
public void multiActionTooLargeException() {
exceptionsMultiTooLarge.incr();
}
@Override
public void authenticationSuccess() {
authenticationSuccesses.incr();

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
/**
* Class to help with parsing the version info.
*/
@InterfaceAudience.Private
public final class VersionInfoUtil {
private VersionInfoUtil() {
/* UTIL CLASS ONLY */
}
public static boolean currentClientHasMinimumVersion(int major, int minor) {
RpcCallContext call = RpcServer.getCurrentCall();
HBaseProtos.VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null;
return hasMinimumVersion(versionInfo, major, minor);
}
public static boolean hasMinimumVersion(HBaseProtos.VersionInfo versionInfo,
int major,
int minor) {
if (versionInfo != null) {
try {
String[] components = versionInfo.getVersion().split("\\.");
int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0;
if (clientMajor != major) {
return clientMajor > major;
}
int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0;
return clientMinor >= minor;
} catch (NumberFormatException e) {
return false;
}
}
return false;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.UnknownScannerException;
@ -105,6 +106,8 @@ public class MetricsHBaseServer {
source.notServingRegionException();
} else if (throwable instanceof FailedSanityCheckException) {
source.failedSanityException();
} else if (throwable instanceof MultiActionResultTooLarge) {
source.multiActionTooLargeException();
}
}
}

View File

@ -19,10 +19,11 @@ package org.apache.hadoop.hbase.ipc;
import java.net.InetAddress;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.security.User;
@InterfaceAudience.Private
public interface RpcCallContext extends Delayable {
/**
* Check if the caller who made this IPC call has disconnected.
@ -40,7 +41,7 @@ public interface RpcCallContext extends Delayable {
* support cellblocks while fielding requests from clients that do not.
* @return True if the client supports cellblocks, else return all content in pb
*/
boolean isClientCellBlockSupport();
boolean isClientCellBlockSupported();
/**
* Returns the user credentials associated with the current RPC request or
@ -71,4 +72,22 @@ public interface RpcCallContext extends Delayable {
* @param callback
*/
void setCallBack(RpcCallback callback);
boolean isRetryImmediatelySupported();
/**
* The size of response cells that have been accumulated so far.
* This along with the corresponding increment call is used to ensure that multi's or
* scans dont get too excessively large
*/
long getResponseCellSize();
/**
* Add on the given amount to the retained cell size.
*
* This is not thread safe and not synchronized at all. If this is used by more than one thread
* then everything will break. Since this is called for every row synchronization would be too
* onerous.
*/
void incrementResponseCellSize(long cellSize);
}

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
@ -317,6 +318,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private InetAddress remoteAddress;
private RpcCallback callback;
private long responseCellSize = 0;
private boolean retryImmediatelySupported;
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
long size, TraceInfo tinfo, final InetAddress remoteAddress) {
@ -336,6 +340,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.tinfo = tinfo;
this.user = connection.user;
this.remoteAddress = remoteAddress;
this.retryImmediatelySupported = connection.retryImmediatelySupported;
}
/**
@ -521,7 +526,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
@Override
public boolean isClientCellBlockSupport() {
public boolean isClientCellBlockSupported() {
return this.connection != null && this.connection.codec != null;
}
@ -538,6 +543,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return this.size;
}
public long getResponseCellSize() {
return responseCellSize;
}
public void incrementResponseCellSize(long cellSize) {
responseCellSize += cellSize;
}
/**
* If we have a response, and delay is not set, then respond
* immediately. Otherwise, do not respond to client. This is
@ -578,6 +591,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public void setCallBack(RpcCallback callback) {
this.callback = callback;
}
@Override
public boolean isRetryImmediatelySupported() {
return retryImmediatelySupported;
}
}
/** Listens on the socket. Creates jobs for the handler threads*/
@ -1264,6 +1282,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// was authentication allowed with a fallback to simple auth
private boolean authenticatedWithFallback;
private boolean retryImmediatelySupported = false;
public UserGroupInformation attemptingUser = null; // user name before auth
protected User user = null;
protected UserGroupInformation ugi = null;
@ -1720,6 +1740,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
if (connectionHeader.hasVersionInfo()) {
// see if this connection will support RetryImmediatelyException
retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ " with version info: "
+ TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
@ -1727,6 +1750,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ " with unknown version info");
}
}
/**

View File

@ -24,10 +24,8 @@ import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
/**
* Latch used by the Master to have the prepare() sync behaviour for old
@ -44,24 +42,7 @@ public abstract class ProcedurePrepareLatch {
}
public static boolean hasProcedureSupport() {
return currentClientHasMinimumVersion(1, 1);
}
private static boolean currentClientHasMinimumVersion(int major, int minor) {
RpcCallContext call = RpcServer.getCurrentCall();
VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null;
if (versionInfo != null) {
String[] components = versionInfo.getVersion().split("\\.");
int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0;
if (clientMajor != major) {
return clientMajor > major;
}
int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0;
return clientMinor >= minor;
}
return false;
return VersionInfoUtil.currentClientHasMinimumVersion(1, 1);
}
protected abstract void countDown(final Procedure proc);

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -437,11 +438,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private boolean isClientCellBlockSupport() {
RpcCallContext context = RpcServer.getCurrentCall();
return context != null && context.isClientCellBlockSupport();
return context != null && context.isClientCellBlockSupported();
}
private boolean isClientCellBlockSupport(RpcCallContext context) {
return context != null && context.isClientCellBlockSupport();
return context != null && context.isClientCellBlockSupported();
}
private void addResult(final MutateResponse.Builder builder, final Result result,
@ -500,13 +501,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rm = new RowMutations(action.getMutation().getRow().toByteArray());
}
switch (type) {
case PUT:
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
break;
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
case PUT:
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
break;
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
}
@ -543,14 +544,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rm = new RowMutations(action.getMutation().getRow().toByteArray());
}
switch (type) {
case PUT:
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
break;
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
case PUT:
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
break;
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
}
return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
@ -655,10 +656,42 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// ResultOrException instance that matches each Put or Delete is then added down in the
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
IOException sizeIOE = null;
for (ClientProtos.Action action : actions.getActionList()) {
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
try {
Result r = null;
if (context != null
&& context.isRetryImmediatelySupported()
&& context.getResponseCellSize() > maxQuotaResultSize) {
// We're storing the exception since the exception and reason string won't
// change after the response size limit is reached.
if (sizeIOE == null ) {
// We don't need the stack un-winding do don't throw the exception.
// Throwing will kill the JVM's JIT.
//
// Instead just create the exception and then store it.
sizeIOE = new MultiActionResultTooLarge("Max response size exceeded: "
+ context.getResponseCellSize());
// Only report the exception once since there's only one request that
// caused the exception. Otherwise this number will dominate the exceptions count.
rpcServer.getMetrics().exception(sizeIOE);
}
// Now that there's an exception is know to be created
// use it for the response.
//
// This will create a copy in the builder.
resultOrExceptionBuilder = ResultOrException.newBuilder().
setException(ResponseConverter.buildException(sizeIOE));
resultOrExceptionBuilder.setIndex(action.getIndex());
builder.addResultOrException(resultOrExceptionBuilder.build());
continue;
}
if (action.hasGet()) {
Get get = ProtobufUtil.toGet(action.getGet());
if (context != null) {
@ -690,22 +723,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
mutations.clear();
}
switch (type) {
case APPEND:
r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
break;
case INCREMENT:
r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
break;
case PUT:
case DELETE:
// Collect the individual mutations and apply in a batch
if (mutations == null) {
mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
}
mutations.add(action);
break;
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
case APPEND:
r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
break;
case INCREMENT:
r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
break;
case PUT:
case DELETE:
// Collect the individual mutations and apply in a batch
if (mutations == null) {
mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
}
mutations.add(action);
break;
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
} else {
throw new HBaseIOException("Unexpected Action type");
@ -715,11 +748,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (isClientCellBlockSupport(context)) {
pbResult = ProtobufUtil.toResultNoData(r);
// Hard to guess the size here. Just make a rough guess.
if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
if (cellsToReturn == null) {
cellsToReturn = new ArrayList<CellScannable>();
}
cellsToReturn.add(r);
} else {
pbResult = ProtobufUtil.toResult(r);
}
if (context != null) {
context.incrementResponseCellSize(Result.getTotalSizeOfCells(r));
}
resultOrExceptionBuilder =
ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
}
@ -801,8 +839,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
case SUCCESS:
builder.addResultOrException(getResultOrException(
ClientProtos.Result.getDefaultInstance(), index,
((HRegion)region).getRegionStats()));
ClientProtos.Result.getDefaultInstance(), index,
((HRegion) region).getRegionStats()));
break;
}
}
@ -951,13 +989,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
try {
rpcServer = new RpcServer(rs, name, getServices(),
bindAddress, // use final bindAddress for this server.
rs.conf,
rpcSchedulerFactory.create(rs.conf, this, rs));
} catch(BindException be) {
bindAddress, // use final bindAddress for this server.
rs.conf,
rpcSchedulerFactory.create(rs.conf, this, rs));
} catch (BindException be) {
String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
HConstants.REGIONSERVER_PORT;
throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
HConstants.REGIONSERVER_PORT;
throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
"' configuration property.", be.getCause() != null ? be.getCause() : be);
}
@ -2106,7 +2144,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
if (controller != null) controller.setCellScanner(null);
if (controller != null) {
controller.setCellScanner(null);
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
@ -2180,7 +2220,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
}
if (processed != null) responseBuilder.setProcessed(processed);
if (processed != null) {
responseBuilder.setProcessed(processed);
}
return responseBuilder.build();
}
@ -2197,10 +2239,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
CellScanner cellScanner = controller != null? controller.cellScanner(): null;
CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
OperationQuota quota = null;
// Clear scanner so we are not holding on to reference across call.
if (controller != null) controller.setCellScanner(null);
if (controller != null) {
controller.setCellScanner(null);
}
try {
checkOpen();
requestCount.increment();
@ -2448,8 +2492,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// where processing of request takes > lease expiration time.
lease = regionServer.leases.removeLease(scannerName);
List<Result> results = new ArrayList<Result>();
long totalCellSize = 0;
long currentScanResultSize = 0;
boolean done = false;
// Call coprocessor. Get region info from scanner.
@ -2459,8 +2501,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!results.isEmpty()) {
for (Result r : results) {
for (Cell cell : r.rawCells()) {
totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell);
if (context != null) {
context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
}
}
}
}
@ -2493,7 +2536,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// If the coprocessor host is adding to the result list, we cannot guarantee the
// correct ordering of partial results and so we prevent partial results from being
// formed.
boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
boolean serverGuaranteesOrderOfPartials = results.isEmpty();
boolean allowPartialResults =
clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
boolean moreRows = false;
@ -2559,7 +2602,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!values.isEmpty()) {
for (Cell cell : values) {
totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
if (context != null) {
context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
}
}
final boolean partial = scannerContext.partialResultFormed();
results.add(Result.create(values, null, stale, partial));
@ -2614,9 +2659,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
region.updateReadRequestsCount(i);
region.getMetrics().updateScanNext(totalCellSize);
long responseCellSize = context != null ? context.getResponseCellSize() : 0;
region.getMetrics().updateScanNext(responseCellSize);
if (regionServer.metricsRegionServer != null) {
regionServer.metricsRegionServer.updateScannerNext(totalCellSize);
regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
}
} finally {
region.closeRegionOperation();

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.List;
import static junit.framework.TestCase.assertEquals;
/**
* This test sets the multi size WAAAAAY low and then checks to make sure that gets will still make
* progress.
*/
@Category({MediumTests.class, ClientTests.class})
public class TestMultiRespectsLimits {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final MetricsAssertHelper METRICS_ASSERT =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
private final static byte[] FAMILY = Bytes.toBytes("D");
public static final int MAX_SIZE = 500;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setLong(
HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
MAX_SIZE);
// Only start on regionserver so that all regions are on the same server.
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testMultiLimits() throws Exception {
final TableName name = TableName.valueOf("testMultiLimits");
Table t = TEST_UTIL.createTable(name, FAMILY);
TEST_UTIL.loadTable(t, FAMILY, false);
// Split the table to make sure that the chunking happens accross regions.
try (final Admin admin = TEST_UTIL.getHBaseAdmin()) {
admin.split(name);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return admin.getTableRegions(name).size() > 1;
}
});
}
List<Get> gets = new ArrayList<>(MAX_SIZE);
for (int i = 0; i < MAX_SIZE; i++) {
gets.add(new Get(HBaseTestingUtility.ROWS[i]));
}
Result[] results = t.get(gets);
assertEquals(MAX_SIZE, results.length);
RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer();
BaseSource s = rpcServer.getMetrics().getMetricsSource();
// Cells from TEST_UTIL.loadTable have a length of 27.
// Multiplying by less than that gives an easy lower bound on size.
// However in reality each kv is being reported as much higher than that.
METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
(MAX_SIZE * 25) / MAX_SIZE, s);
}
}