HBASE-6728 prevent OOM possibility due to per connection responseQueue being unbounded
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1401008 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d115205c13
commit
1048d15c6a
|
@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.BindException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -52,12 +53,12 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
import javax.security.sasl.SaslException;
|
||||
|
@ -68,28 +69,29 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
|
||||
import org.apache.hadoop.hbase.util.SizeBasedThrottler;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
|
@ -97,21 +99,20 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
|
|||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
import org.cloudera.htrace.Sampler;
|
||||
import org.cloudera.htrace.Span;
|
||||
import org.cloudera.htrace.Trace;
|
||||
import org.cloudera.htrace.TraceInfo;
|
||||
import org.cloudera.htrace.impl.NullSpan;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
import org.cloudera.htrace.Sampler;
|
||||
import org.cloudera.htrace.Span;
|
||||
import org.cloudera.htrace.TraceInfo;
|
||||
import org.cloudera.htrace.impl.NullSpan;
|
||||
import org.cloudera.htrace.Trace;
|
||||
|
||||
/** A client for an IPC service. IPC calls take a single Protobuf message as a
|
||||
* parameter, and return a single Protobuf message as their value. A service runs on
|
||||
* a port and is defined by a parameter class and a value class.
|
||||
|
@ -256,6 +257,14 @@ public abstract class HBaseServer implements RpcServer {
|
|||
protected final boolean tcpKeepAlive; // if T then use keepalives
|
||||
protected final long purgeTimeout; // in milliseconds
|
||||
|
||||
// responseQueuesSizeThrottler is shared among all responseQueues,
|
||||
// it bounds memory occupied by responses in all responseQueues
|
||||
final SizeBasedThrottler responseQueuesSizeThrottler;
|
||||
|
||||
// RESPONSE_QUEUE_MAX_SIZE limits total size of responses in every response queue
|
||||
private static final long DEFAULT_RESPONSE_QUEUES_MAX_SIZE = 1024 * 1024 * 1024; // 1G
|
||||
private static final String RESPONSE_QUEUES_MAX_SIZE = "ipc.server.response.queue.maxsize";
|
||||
|
||||
volatile protected boolean running = true; // true while server runs
|
||||
protected BlockingQueue<Call> callQueue; // queued calls
|
||||
protected final Counter callQueueSize = new Counter();
|
||||
|
@ -987,7 +996,7 @@ public abstract class HBaseServer implements RpcServer {
|
|||
//
|
||||
// Extract the first call
|
||||
//
|
||||
call = responseQueue.removeFirst();
|
||||
call = responseQueue.peek();
|
||||
SocketChannel channel = call.connection.channel;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getName() + ": responding to #" + call.id + " from " +
|
||||
|
@ -998,9 +1007,13 @@ public abstract class HBaseServer implements RpcServer {
|
|||
//
|
||||
int numBytes = channelWrite(channel, call.response);
|
||||
if (numBytes < 0) {
|
||||
// Error flag is set, so returning here closes connection and
|
||||
// clears responseQueue.
|
||||
return true;
|
||||
}
|
||||
if (!call.response.hasRemaining()) {
|
||||
responseQueue.poll();
|
||||
responseQueuesSizeThrottler.decrease(call.response.limit());
|
||||
responseQueueLen--;
|
||||
call.connection.decRpcCount();
|
||||
//noinspection RedundantIfStatement
|
||||
|
@ -1014,12 +1027,6 @@ public abstract class HBaseServer implements RpcServer {
|
|||
call.connection + " Wrote " + numBytes + " bytes.");
|
||||
}
|
||||
} else {
|
||||
//
|
||||
// If we were unable to write the entire response out, then
|
||||
// insert in Selector queue.
|
||||
//
|
||||
call.connection.responseQueue.addFirst(call);
|
||||
|
||||
if (inHandler) {
|
||||
// set the serve time when the response has to be sent later
|
||||
call.timestamp = System.currentTimeMillis();
|
||||
|
@ -1074,15 +1081,31 @@ public abstract class HBaseServer implements RpcServer {
|
|||
responseQueueLen++;
|
||||
|
||||
boolean doRegister = false;
|
||||
boolean closed;
|
||||
try {
|
||||
responseQueuesSizeThrottler.increase(call.response.remaining());
|
||||
} catch (InterruptedException ie) {
|
||||
throw new InterruptedIOException(ie.getMessage());
|
||||
}
|
||||
synchronized (call.connection.responseQueue) {
|
||||
call.connection.responseQueue.addLast(call);
|
||||
if (call.connection.responseQueue.size() == 1) {
|
||||
doRegister = !processResponse(call.connection.responseQueue, false);
|
||||
closed = call.connection.closed;
|
||||
if (!closed) {
|
||||
call.connection.responseQueue.addLast(call);
|
||||
|
||||
if (call.connection.responseQueue.size() == 1) {
|
||||
doRegister = !processResponse(call.connection.responseQueue, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (doRegister) {
|
||||
enqueueInSelector(call);
|
||||
}
|
||||
if (closed) {
|
||||
// Connection was closed when we tried to submit response, but we
|
||||
// increased responseQueues size already. It shoud be
|
||||
// decreased here.
|
||||
responseQueuesSizeThrottler.decrease(call.response.remaining());
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void incPending() { // call waiting to be enqueued.
|
||||
|
@ -1107,6 +1130,8 @@ public abstract class HBaseServer implements RpcServer {
|
|||
//version are read
|
||||
private boolean headerRead = false; //if the connection header that
|
||||
//follows version is read.
|
||||
|
||||
protected volatile boolean closed = false; // indicates if connection was closed
|
||||
protected SocketChannel channel;
|
||||
private ByteBuffer data;
|
||||
private ByteBuffer dataLengthBuffer;
|
||||
|
@ -1691,6 +1716,7 @@ public abstract class HBaseServer implements RpcServer {
|
|||
}
|
||||
|
||||
protected synchronized void close() {
|
||||
closed = true;
|
||||
disposeSasl();
|
||||
data = null;
|
||||
dataLengthBuffer = null;
|
||||
|
@ -1946,6 +1972,9 @@ public abstract class HBaseServer implements RpcServer {
|
|||
this.delayedCalls = new AtomicInteger(0);
|
||||
|
||||
|
||||
this.responseQueuesSizeThrottler = new SizeBasedThrottler(
|
||||
conf.getLong(RESPONSE_QUEUES_MAX_SIZE, DEFAULT_RESPONSE_QUEUES_MAX_SIZE));
|
||||
|
||||
// Create the responder here
|
||||
responder = new Responder();
|
||||
this.authorize =
|
||||
|
@ -1990,6 +2019,14 @@ public abstract class HBaseServer implements RpcServer {
|
|||
}
|
||||
}
|
||||
connection.close();
|
||||
long bytes = 0;
|
||||
synchronized (connection.responseQueue) {
|
||||
for (Call c : connection.responseQueue) {
|
||||
bytes += c.response.limit();
|
||||
}
|
||||
connection.responseQueue.clear();
|
||||
}
|
||||
responseQueuesSizeThrottler.decrease(bytes);
|
||||
rpcMetrics.numOpenConnections.set(numConnections);
|
||||
}
|
||||
|
||||
|
@ -2244,4 +2281,8 @@ public abstract class HBaseServer implements RpcServer {
|
|||
public static RpcCallContext getCurrentCall() {
|
||||
return CurCall.get();
|
||||
}
|
||||
|
||||
public long getResponseQueueSize(){
|
||||
return responseQueuesSizeThrottler.getCurrentValue();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,7 +53,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import org.apache.commons.lang.mutable.MutableDouble;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -103,8 +102,8 @@ import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
|||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -113,6 +112,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
|||
import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseServer;
|
||||
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
|
@ -152,6 +152,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
|
@ -196,8 +198,8 @@ import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
|
|||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
|
||||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -228,12 +230,10 @@ import org.codehaus.jackson.map.ObjectMapper;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
|
||||
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
|
||||
/**
|
||||
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
||||
* the HMaster. There are many HRegionServers in a single HBase deployment.
|
||||
|
@ -262,6 +262,9 @@ public class HRegionServer implements ClientProtocol,
|
|||
|
||||
protected long maxScannerResultSize;
|
||||
|
||||
// Server to handle client requests.
|
||||
private HBaseServer server;
|
||||
|
||||
// Cache flushing
|
||||
protected MemStoreFlusher cacheFlusher;
|
||||
|
||||
|
@ -518,6 +521,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
conf.getInt("hbase.regionserver.metahandler.count", 10),
|
||||
conf.getBoolean("hbase.rpc.verbose", false),
|
||||
conf, HConstants.QOS_THRESHOLD);
|
||||
if (rpcServer instanceof HBaseServer) server = (HBaseServer) rpcServer;
|
||||
// Set our address.
|
||||
this.isa = this.rpcServer.getListenerAddress();
|
||||
|
||||
|
@ -1197,7 +1201,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
this.hlog = setupWALAndReplication();
|
||||
// Init in here rather than in constructor after thread name has been set
|
||||
this.metrics = new RegionServerMetrics();
|
||||
this.dynamicMetrics = RegionServerDynamicMetrics.newInstance();
|
||||
this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
|
||||
startServiceThreads();
|
||||
LOG.info("Serving as " + this.serverNameFromMasterPOV +
|
||||
", RPC listening on " + this.isa +
|
||||
|
@ -4131,4 +4135,11 @@ public class HRegionServer implements ClientProtocol,
|
|||
private String getMyEphemeralNodePath() {
|
||||
return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
|
||||
}
|
||||
|
||||
public long getResponseQueueSize(){
|
||||
if (server != null) {
|
||||
return server.getResponseQueueSize();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.metrics.MetricsContext;
|
||||
import org.apache.hadoop.metrics.MetricsRecord;
|
||||
|
@ -59,6 +60,7 @@ public class RegionServerDynamicMetrics implements Updater {
|
|||
private MetricsContext context;
|
||||
private final RegionServerDynamicStatistics rsDynamicStatistics;
|
||||
private Method updateMbeanInfoIfMetricsListChanged = null;
|
||||
private HRegionServer regionServer;
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(RegionServerDynamicStatistics.class);
|
||||
|
||||
|
@ -74,13 +76,14 @@ public class RegionServerDynamicMetrics implements Updater {
|
|||
*/
|
||||
public final MetricsRegistry registry = new MetricsRegistry();
|
||||
|
||||
private RegionServerDynamicMetrics() {
|
||||
private RegionServerDynamicMetrics(HRegionServer regionServer) {
|
||||
this.context = MetricsUtil.getContext("hbase-dynamic");
|
||||
this.metricsRecord = MetricsUtil.createRecord(
|
||||
this.context,
|
||||
"RegionServerDynamicStatistics");
|
||||
context.registerUpdater(this);
|
||||
this.rsDynamicStatistics = new RegionServerDynamicStatistics(this.registry);
|
||||
this.regionServer = regionServer;
|
||||
try {
|
||||
updateMbeanInfoIfMetricsListChanged =
|
||||
this.rsDynamicStatistics.getClass().getSuperclass()
|
||||
|
@ -92,9 +95,9 @@ public class RegionServerDynamicMetrics implements Updater {
|
|||
}
|
||||
}
|
||||
|
||||
public static RegionServerDynamicMetrics newInstance() {
|
||||
public static RegionServerDynamicMetrics newInstance(HRegionServer regionServer) {
|
||||
RegionServerDynamicMetrics metrics =
|
||||
new RegionServerDynamicMetrics();
|
||||
new RegionServerDynamicMetrics(regionServer);
|
||||
return metrics;
|
||||
}
|
||||
|
||||
|
@ -184,6 +187,13 @@ public class RegionServerDynamicMetrics implements Updater {
|
|||
for (Entry<String, AtomicLong> entry : RegionMetricsStorage.getNumericMetrics().entrySet()) {
|
||||
this.setNumericMetric(entry.getKey(), entry.getValue().getAndSet(0));
|
||||
}
|
||||
|
||||
/* export estimated size of all response queues */
|
||||
if (regionServer != null) {
|
||||
long responseQueueSize = regionServer.getResponseQueueSize();
|
||||
this.setNumericMetric("responseQueuesSize", responseQueueSize);
|
||||
}
|
||||
|
||||
/* get dynamically created numeric metrics, and push the metrics.
|
||||
* These ones aren't to be reset; they are cumulative. */
|
||||
for (Entry<String, AtomicLong> entry : RegionMetricsStorage.getNumericPersistentMetrics().entrySet()) {
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Utility class that can be used to implement
|
||||
* queues with limited capacity (in terms of memory).
|
||||
* It maintains internal counter and provides
|
||||
* two operations: increase and decrease.
|
||||
* Increase blocks until internal counter is lower than
|
||||
* given threshold and then increases internal counter.
|
||||
* Decrease decreases internal counter and wakes up
|
||||
* waiting threads if counter is lower than threshold.
|
||||
*
|
||||
* This implementation allows you to set the value of internal
|
||||
* counter to be greater than threshold. It happens
|
||||
* when internal counter is lower than threshold and
|
||||
* increase method is called with parameter 'delta' big enough
|
||||
* so that sum of delta and internal counter is greater than
|
||||
* threshold. This is not a bug, this is a feature.
|
||||
* It solves some problems:
|
||||
* - thread calling increase with big parameter will not be
|
||||
* starved by other threads calling increase with small
|
||||
* arguments.
|
||||
* - thread calling increase with argument greater than
|
||||
* threshold won't deadlock. This is useful when throttling
|
||||
* queues - you can submit object that is bigger than limit.
|
||||
*
|
||||
* This implementation introduces small costs in terms of
|
||||
* synchronization (no synchronization in most cases at all), but is
|
||||
* vulnerable to races. For details see documentation of
|
||||
* increase method.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SizeBasedThrottler {
|
||||
|
||||
private final long threshold;
|
||||
private final AtomicLong currentSize;
|
||||
|
||||
/**
|
||||
* Creates SizeBoundary with provided threshold
|
||||
*
|
||||
* @param threshold threshold used by instance
|
||||
*/
|
||||
public SizeBasedThrottler(long threshold) {
|
||||
if (threshold <= 0) {
|
||||
throw new IllegalArgumentException("Treshold must be greater than 0");
|
||||
}
|
||||
this.threshold = threshold;
|
||||
this.currentSize = new AtomicLong(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks until internal counter is lower than threshold
|
||||
* and then increases value of internal counter.
|
||||
*
|
||||
* THIS METHOD IS VULNERABLE TO RACES.
|
||||
* It may happen that increment operation will
|
||||
* succeed immediately, even if it should block. This happens when
|
||||
* at least two threads call increase at the some moment. The decision
|
||||
* whether to block is made at the beginning, without synchronization.
|
||||
* If value of currentSize is lower than threshold at that time, call
|
||||
* will succeed immediately. It is possible, that 2 threads will make
|
||||
* decision not to block, even if one of them should block.
|
||||
*
|
||||
* @param delta increase internal counter by this value
|
||||
* @return new value of internal counter
|
||||
* @throws InterruptedException when interrupted during waiting
|
||||
*/
|
||||
public synchronized long increase(long delta) throws InterruptedException{
|
||||
if (currentSize.get() >= threshold) {
|
||||
synchronized (this) {
|
||||
while (currentSize.get() >= threshold) {
|
||||
wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return currentSize.addAndGet(delta);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Decreases value of internal counter. Wakes up waiting threads if required.
|
||||
*
|
||||
* @param delta decrease internal counter by this value
|
||||
* @return new value of internal counter
|
||||
*/
|
||||
public synchronized long decrease(long delta) {
|
||||
final long newSize = currentSize.addAndGet(-delta);
|
||||
|
||||
if (newSize < threshold && newSize + delta >= threshold) {
|
||||
synchronized (this) {
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
return newSize;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return current value of internal counter
|
||||
*/
|
||||
public synchronized long getCurrentValue(){
|
||||
return currentSize.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return threshold
|
||||
*/
|
||||
public long getThreshold(){
|
||||
return threshold;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,135 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* This tests some race conditions that can happen
|
||||
* occasionally, but not every time.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestSizeBasedThrottler {
|
||||
|
||||
private static final int REPEATS = 100;
|
||||
|
||||
private Thread makeThread(final SizeBasedThrottler throttler,
|
||||
final AtomicBoolean failed, final int delta,
|
||||
final int limit, final CountDownLatch latch) {
|
||||
|
||||
Thread ret = new Thread(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
latch.await();
|
||||
if (throttler.increase(delta) > limit) {
|
||||
failed.set(true);
|
||||
}
|
||||
throttler.decrease(delta);
|
||||
} catch (Exception e) {
|
||||
failed.set(true);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ret.start();
|
||||
return ret;
|
||||
}
|
||||
|
||||
private void runGenericTest(int threshold, int delta, int maxValueAllowed,
|
||||
int numberOfThreads, long timeout) {
|
||||
SizeBasedThrottler throttler = new SizeBasedThrottler(threshold);
|
||||
AtomicBoolean failed = new AtomicBoolean(false);
|
||||
|
||||
ArrayList<Thread> threads = new ArrayList<Thread>(numberOfThreads);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
long timeElapsed = 0;
|
||||
|
||||
for (int i = 0; i < numberOfThreads; ++i) {
|
||||
threads.add(makeThread(throttler, failed, delta, maxValueAllowed, latch));
|
||||
}
|
||||
|
||||
latch.countDown();
|
||||
for (Thread t : threads) {
|
||||
try {
|
||||
long beforeJoin = System.currentTimeMillis();
|
||||
t.join(timeout - timeElapsed);
|
||||
timeElapsed += System.currentTimeMillis() - beforeJoin;
|
||||
if (t.isAlive() || timeElapsed >= timeout) {
|
||||
fail("Timeout reached.");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
fail("Got InterruptedException");
|
||||
}
|
||||
}
|
||||
|
||||
assertFalse(failed.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallIncreases(){
|
||||
for (int i = 0; i < REPEATS; ++i) {
|
||||
runGenericTest(
|
||||
10, // threshold
|
||||
1, // delta
|
||||
15, // fail if throttler's value
|
||||
// exceeds 15
|
||||
1000, // use 1000 threads
|
||||
200 // wait for 200ms
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBigIncreases() {
|
||||
for (int i = 0; i < REPEATS; ++i) {
|
||||
runGenericTest(
|
||||
1, // threshold
|
||||
2, // delta
|
||||
4, // fail if throttler's value
|
||||
// exceeds 4
|
||||
1000, // use 1000 threads
|
||||
200 // wait for 200ms
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncreasesEqualToThreshold(){
|
||||
for (int i = 0; i < REPEATS; ++i) {
|
||||
runGenericTest(
|
||||
1, // threshold
|
||||
1, // delta
|
||||
2, // fail if throttler's value
|
||||
// exceeds 2
|
||||
1000, // use 1000 threads
|
||||
200 // wait for 200ms
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue