diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index d9470387399..204b24ae276 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -26,7 +26,6 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.InterruptedIOException; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -53,12 +52,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; @@ -69,29 +68,28 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; -import org.apache.hadoop.hbase.util.SizeBasedThrottler; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -99,20 +97,21 @@ import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.util.StringUtils; -import org.cliffc.high_scale_lib.Counter; -import org.cloudera.htrace.Sampler; -import org.cloudera.htrace.Span; -import org.cloudera.htrace.Trace; -import org.cloudera.htrace.TraceInfo; -import org.cloudera.htrace.impl.NullSpan; import com.google.common.base.Function; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.Message; +import org.cliffc.high_scale_lib.Counter; +import org.cloudera.htrace.Sampler; +import org.cloudera.htrace.Span; +import org.cloudera.htrace.TraceInfo; +import org.cloudera.htrace.impl.NullSpan; +import org.cloudera.htrace.Trace; + /** A client for an IPC service. IPC calls take a single Protobuf message as a * parameter, and return a single Protobuf message as their value. A service runs on * a port and is defined by a parameter class and a value class. @@ -257,14 +256,6 @@ public abstract class HBaseServer implements RpcServer { protected final boolean tcpKeepAlive; // if T then use keepalives protected final long purgeTimeout; // in milliseconds - // responseQueuesSizeThrottler is shared among all responseQueues, - // it bounds memory occupied by responses in all responseQueues - final SizeBasedThrottler responseQueuesSizeThrottler; - - // RESPONSE_QUEUE_MAX_SIZE limits total size of responses in every response queue - private static final long DEFAULT_RESPONSE_QUEUES_MAX_SIZE = 1024 * 1024 * 1024; // 1G - private static final String RESPONSE_QUEUES_MAX_SIZE = "ipc.server.response.queue.maxsize"; - volatile protected boolean running = true; // true while server runs protected BlockingQueue callQueue; // queued calls protected final Counter callQueueSize = new Counter(); @@ -996,7 +987,7 @@ public abstract class HBaseServer implements RpcServer { // // Extract the first call // - call = responseQueue.peek(); + call = responseQueue.removeFirst(); SocketChannel channel = call.connection.channel; if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + @@ -1007,13 +998,9 @@ public abstract class HBaseServer implements RpcServer { // int numBytes = channelWrite(channel, call.response); if (numBytes < 0) { - // Error flag is set, so returning here closes connection and - // clears responseQueue. return true; } if (!call.response.hasRemaining()) { - responseQueue.poll(); - responseQueuesSizeThrottler.decrease(call.response.limit()); responseQueueLen--; call.connection.decRpcCount(); //noinspection RedundantIfStatement @@ -1027,6 +1014,12 @@ public abstract class HBaseServer implements RpcServer { call.connection + " Wrote " + numBytes + " bytes."); } } else { + // + // If we were unable to write the entire response out, then + // insert in Selector queue. + // + call.connection.responseQueue.addFirst(call); + if (inHandler) { // set the serve time when the response has to be sent later call.timestamp = System.currentTimeMillis(); @@ -1081,31 +1074,15 @@ public abstract class HBaseServer implements RpcServer { responseQueueLen++; boolean doRegister = false; - boolean closed; - try { - responseQueuesSizeThrottler.increase(call.response.remaining()); - } catch (InterruptedException ie) { - throw new InterruptedIOException(ie.getMessage()); - } synchronized (call.connection.responseQueue) { - closed = call.connection.closed; - if (!closed) { - call.connection.responseQueue.addLast(call); - - if (call.connection.responseQueue.size() == 1) { - doRegister = !processResponse(call.connection.responseQueue, false); - } + call.connection.responseQueue.addLast(call); + if (call.connection.responseQueue.size() == 1) { + doRegister = !processResponse(call.connection.responseQueue, false); } } if (doRegister) { enqueueInSelector(call); } - if (closed) { - // Connection was closed when we tried to submit response, but we - // increased responseQueues size already. It shoud be - // decreased here. - responseQueuesSizeThrottler.decrease(call.response.remaining()); - } } private synchronized void incPending() { // call waiting to be enqueued. @@ -1130,8 +1107,6 @@ public abstract class HBaseServer implements RpcServer { //version are read private boolean headerRead = false; //if the connection header that //follows version is read. - - protected volatile boolean closed = false; // indicates if connection was closed protected SocketChannel channel; private ByteBuffer data; private ByteBuffer dataLengthBuffer; @@ -1716,7 +1691,6 @@ public abstract class HBaseServer implements RpcServer { } protected synchronized void close() { - closed = true; disposeSasl(); data = null; dataLengthBuffer = null; @@ -1972,9 +1946,6 @@ public abstract class HBaseServer implements RpcServer { this.delayedCalls = new AtomicInteger(0); - this.responseQueuesSizeThrottler = new SizeBasedThrottler( - conf.getLong(RESPONSE_QUEUES_MAX_SIZE, DEFAULT_RESPONSE_QUEUES_MAX_SIZE)); - // Create the responder here responder = new Responder(); this.authorize = @@ -2019,14 +1990,6 @@ public abstract class HBaseServer implements RpcServer { } } connection.close(); - long bytes = 0; - synchronized (connection.responseQueue) { - for (Call c : connection.responseQueue) { - bytes += c.response.limit(); - } - connection.responseQueue.clear(); - } - responseQueuesSizeThrottler.decrease(bytes); rpcMetrics.numOpenConnections.set(numConnections); } @@ -2281,8 +2244,4 @@ public abstract class HBaseServer implements RpcServer { public static RpcCallContext getCurrentCall() { return CurCall.get(); } - - public long getResponseQueueSize(){ - return responseQueuesSizeThrottler.getCurrentValue(); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7779fb639d1..51e03a76437 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -53,6 +53,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.ObjectName; +import com.google.protobuf.Message; import org.apache.commons.lang.mutable.MutableDouble; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -102,8 +103,8 @@ import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; -import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -112,7 +113,6 @@ import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics; -import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; @@ -152,8 +152,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; @@ -198,8 +196,8 @@ import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; +import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -230,10 +228,12 @@ import org.codehaus.jackson.map.ObjectMapper; import com.google.common.base.Function; import com.google.protobuf.ByteString; -import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; + /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. @@ -262,9 +262,6 @@ public class HRegionServer implements ClientProtocol, protected long maxScannerResultSize; - // Server to handle client requests. - private HBaseServer server; - // Cache flushing protected MemStoreFlusher cacheFlusher; @@ -521,7 +518,6 @@ public class HRegionServer implements ClientProtocol, conf.getInt("hbase.regionserver.metahandler.count", 10), conf.getBoolean("hbase.rpc.verbose", false), conf, HConstants.QOS_THRESHOLD); - if (rpcServer instanceof HBaseServer) server = (HBaseServer) rpcServer; // Set our address. this.isa = this.rpcServer.getListenerAddress(); @@ -1201,7 +1197,7 @@ public class HRegionServer implements ClientProtocol, this.hlog = setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set this.metrics = new RegionServerMetrics(); - this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this); + this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(); startServiceThreads(); LOG.info("Serving as " + this.serverNameFromMasterPOV + ", RPC listening on " + this.isa + @@ -4135,11 +4131,4 @@ public class HRegionServer implements ClientProtocol, private String getMyEphemeralNodePath() { return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString()); } - - public long getResponseQueueSize(){ - if (server != null) { - return server.getResponseQueueSize(); - } - return 0; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java index ba0bf7bef82..bb06a10d3f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsRecord; @@ -60,7 +59,6 @@ public class RegionServerDynamicMetrics implements Updater { private MetricsContext context; private final RegionServerDynamicStatistics rsDynamicStatistics; private Method updateMbeanInfoIfMetricsListChanged = null; - private HRegionServer regionServer; private static final Log LOG = LogFactory.getLog(RegionServerDynamicStatistics.class); @@ -76,14 +74,13 @@ public class RegionServerDynamicMetrics implements Updater { */ public final MetricsRegistry registry = new MetricsRegistry(); - private RegionServerDynamicMetrics(HRegionServer regionServer) { + private RegionServerDynamicMetrics() { this.context = MetricsUtil.getContext("hbase-dynamic"); this.metricsRecord = MetricsUtil.createRecord( this.context, "RegionServerDynamicStatistics"); context.registerUpdater(this); this.rsDynamicStatistics = new RegionServerDynamicStatistics(this.registry); - this.regionServer = regionServer; try { updateMbeanInfoIfMetricsListChanged = this.rsDynamicStatistics.getClass().getSuperclass() @@ -95,9 +92,9 @@ public class RegionServerDynamicMetrics implements Updater { } } - public static RegionServerDynamicMetrics newInstance(HRegionServer regionServer) { + public static RegionServerDynamicMetrics newInstance() { RegionServerDynamicMetrics metrics = - new RegionServerDynamicMetrics(regionServer); + new RegionServerDynamicMetrics(); return metrics; } @@ -187,13 +184,6 @@ public class RegionServerDynamicMetrics implements Updater { for (Entry entry : RegionMetricsStorage.getNumericMetrics().entrySet()) { this.setNumericMetric(entry.getKey(), entry.getValue().getAndSet(0)); } - - /* export estimated size of all response queues */ - if (regionServer != null) { - long responseQueueSize = regionServer.getResponseQueueSize(); - this.setNumericMetric("responseQueuesSize", responseQueueSize); - } - /* get dynamically created numeric metrics, and push the metrics. * These ones aren't to be reset; they are cumulative. */ for (Entry entry : RegionMetricsStorage.getNumericPersistentMetrics().entrySet()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java deleted file mode 100644 index 6638998f591..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * Utility class that can be used to implement - * queues with limited capacity (in terms of memory). - * It maintains internal counter and provides - * two operations: increase and decrease. - * Increase blocks until internal counter is lower than - * given threshold and then increases internal counter. - * Decrease decreases internal counter and wakes up - * waiting threads if counter is lower than threshold. - * - * This implementation allows you to set the value of internal - * counter to be greater than threshold. It happens - * when internal counter is lower than threshold and - * increase method is called with parameter 'delta' big enough - * so that sum of delta and internal counter is greater than - * threshold. This is not a bug, this is a feature. - * It solves some problems: - * - thread calling increase with big parameter will not be - * starved by other threads calling increase with small - * arguments. - * - thread calling increase with argument greater than - * threshold won't deadlock. This is useful when throttling - * queues - you can submit object that is bigger than limit. - * - * This implementation introduces small costs in terms of - * synchronization (no synchronization in most cases at all), but is - * vulnerable to races. For details see documentation of - * increase method. - */ -@InterfaceAudience.Private -public class SizeBasedThrottler { - - private final long threshold; - private final AtomicLong currentSize; - - /** - * Creates SizeBoundary with provided threshold - * - * @param threshold threshold used by instance - */ - public SizeBasedThrottler(long threshold) { - if (threshold <= 0) { - throw new IllegalArgumentException("Treshold must be greater than 0"); - } - this.threshold = threshold; - this.currentSize = new AtomicLong(0); - } - - /** - * Blocks until internal counter is lower than threshold - * and then increases value of internal counter. - * - * THIS METHOD IS VULNERABLE TO RACES. - * It may happen that increment operation will - * succeed immediately, even if it should block. This happens when - * at least two threads call increase at the some moment. The decision - * whether to block is made at the beginning, without synchronization. - * If value of currentSize is lower than threshold at that time, call - * will succeed immediately. It is possible, that 2 threads will make - * decision not to block, even if one of them should block. - * - * @param delta increase internal counter by this value - * @return new value of internal counter - * @throws InterruptedException when interrupted during waiting - */ - public synchronized long increase(long delta) throws InterruptedException{ - if (currentSize.get() >= threshold) { - synchronized (this) { - while (currentSize.get() >= threshold) { - wait(); - } - } - } - - return currentSize.addAndGet(delta); - } - - - /** - * Decreases value of internal counter. Wakes up waiting threads if required. - * - * @param delta decrease internal counter by this value - * @return new value of internal counter - */ - public synchronized long decrease(long delta) { - final long newSize = currentSize.addAndGet(-delta); - - if (newSize < threshold && newSize + delta >= threshold) { - synchronized (this) { - notifyAll(); - } - } - - return newSize; - } - - /** - * - * @return current value of internal counter - */ - public synchronized long getCurrentValue(){ - return currentSize.get(); - } - - /** - * @return threshold - */ - public long getThreshold(){ - return threshold; - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java deleted file mode 100644 index 971c0247a6b..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hadoop.hbase.MediumTests; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * This tests some race conditions that can happen - * occasionally, but not every time. - */ -@Category(MediumTests.class) -public class TestSizeBasedThrottler { - - private static final int REPEATS = 100; - - private Thread makeThread(final SizeBasedThrottler throttler, - final AtomicBoolean failed, final int delta, - final int limit, final CountDownLatch latch) { - - Thread ret = new Thread(new Runnable() { - - @Override - public void run() { - try { - latch.await(); - if (throttler.increase(delta) > limit) { - failed.set(true); - } - throttler.decrease(delta); - } catch (Exception e) { - failed.set(true); - } - } - }); - - ret.start(); - return ret; - } - - private void runGenericTest(int threshold, int delta, int maxValueAllowed, - int numberOfThreads, long timeout) { - SizeBasedThrottler throttler = new SizeBasedThrottler(threshold); - AtomicBoolean failed = new AtomicBoolean(false); - - ArrayList threads = new ArrayList(numberOfThreads); - CountDownLatch latch = new CountDownLatch(1); - long timeElapsed = 0; - - for (int i = 0; i < numberOfThreads; ++i) { - threads.add(makeThread(throttler, failed, delta, maxValueAllowed, latch)); - } - - latch.countDown(); - for (Thread t : threads) { - try { - long beforeJoin = System.currentTimeMillis(); - t.join(timeout - timeElapsed); - timeElapsed += System.currentTimeMillis() - beforeJoin; - if (t.isAlive() || timeElapsed >= timeout) { - fail("Timeout reached."); - } - } catch (InterruptedException e) { - fail("Got InterruptedException"); - } - } - - assertFalse(failed.get()); - } - - @Test - public void testSmallIncreases(){ - for (int i = 0; i < REPEATS; ++i) { - runGenericTest( - 10, // threshold - 1, // delta - 15, // fail if throttler's value - // exceeds 15 - 1000, // use 1000 threads - 200 // wait for 200ms - ); - } - } - - @Test - public void testBigIncreases() { - for (int i = 0; i < REPEATS; ++i) { - runGenericTest( - 1, // threshold - 2, // delta - 4, // fail if throttler's value - // exceeds 4 - 1000, // use 1000 threads - 200 // wait for 200ms - ); - } - } - - @Test - public void testIncreasesEqualToThreshold(){ - for (int i = 0; i < REPEATS; ++i) { - runGenericTest( - 1, // threshold - 1, // delta - 2, // fail if throttler's value - // exceeds 2 - 1000, // use 1000 threads - 200 // wait for 200ms - ); - } - } -}