More memory limiting for HttpPostEmitter (#5300)

* More memory limiting for HttpPostEmitter

* Less aggressive large events test

* Fix tests

* Restrict batch queue size first, keep minimum of 2 queue items
This commit is contained in:
Jonathan Wei 2018-01-26 15:48:45 -08:00 committed by Gian Merlino
parent 59250cf19b
commit 2a892709e8
7 changed files with 199 additions and 9 deletions

View File

@ -222,8 +222,8 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts
|`druid.emitter.http.basicAuthentication`|Login and password for authentification in "login:password" form, e. g. `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentification|
|`druid.emitter.http.flushTimeOut`|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout|
|`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|5191680 (i. e. 5 MB)|
|`druid.emitter.http.batchQueueSizeLimit`|The maximum number of batches in emitter queue, if there are problems with emitting.|50|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|the minimum of (10% of JVM heap size divided by 2) or (5191680 (i. e. 5 MB))|
|`druid.emitter.http.batchQueueSizeLimit`|The maximum number of batches in emitter queue, if there are problems with emitting.|the maximum of (2) or (10% of the JVM heap size divided by 5MB)|
|`druid.emitter.http.minHttpTimeoutMillis`|If the speed of filling batches imposes timeout smaller than that, not even trying to send batch to endpoint, because it will likely fail, not being able to send the data that fast. Configure this depending based on emitter/successfulSending/minTimeMs metric. Reasonable values are 10ms..100ms.|0|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none, required config|

View File

@ -20,6 +20,7 @@
package io.druid.java.util.emitter.core;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.Pair;
import javax.validation.constraints.Min;
@ -27,7 +28,16 @@ public class BaseHttpEmittingConfig
{
public static final long DEFAULT_FLUSH_MILLIS = 60 * 1000;
public static final int DEFAULT_FLUSH_COUNTS = 500;
public static final int DEFAULT_MAX_BATCH_SIZE = 5 * 1024 * 1024;
/** ensure the event buffers don't use more than 10% of memory by default */
public static final int DEFAULT_MAX_BATCH_SIZE;
public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT;
static {
Pair<Integer, Integer> batchConfigPair = getDefaultBatchSizeAndLimit(Runtime.getRuntime().maxMemory());
DEFAULT_MAX_BATCH_SIZE = batchConfigPair.lhs;
DEFAULT_BATCH_QUEUE_SIZE_LIMIT = batchConfigPair.rhs;
}
/**
* Do not time out in case flushTimeOut is not set
*/
@ -35,13 +45,31 @@ public class BaseHttpEmittingConfig
public static final String DEFAULT_BASIC_AUTHENTICATION = null;
public static final BatchingStrategy DEFAULT_BATCHING_STRATEGY = BatchingStrategy.ARRAY;
public static final ContentEncoding DEFAULT_CONTENT_ENCODING = null;
public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT = 50;
public static final float DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR = 2.0f;
/**
* The default value effective doesn't set the min timeout
*/
public static final int DEFAULT_MIN_HTTP_TIMEOUT_MILLIS = 0;
public static Pair<Integer, Integer> getDefaultBatchSizeAndLimit(long maxMemory)
{
long memoryLimit = maxMemory / 10;
long batchSize = 5 * 1024 * 1024;
long queueLimit = 50;
if (batchSize * queueLimit > memoryLimit) {
queueLimit = memoryLimit / batchSize;
}
// make room for at least two queue items
if (queueLimit < 2) {
queueLimit = 2;
batchSize = memoryLimit / queueLimit;
}
return new Pair<>((int) batchSize, (int) queueLimit);
}
@Min(1)
@JsonProperty
long flushMillis = DEFAULT_FLUSH_MILLIS;

View File

@ -279,6 +279,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
);
} else {
largeEventsToEmit.add(eventBytes);
approximateBuffersToEmitCount.incrementAndGet();
approximateLargeEventsToEmitCount.incrementAndGet();
approximateEventsToEmitCount.incrementAndGet();
}
@ -553,6 +554,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
largeEventsToEmit.add(LARGE_EVENTS_STOP);
for (byte[] largeEvent; (largeEvent = largeEventsToEmit.poll()) != LARGE_EVENTS_STOP; ) {
emitLargeEvent(largeEvent);
approximateBuffersToEmitCount.decrementAndGet();
approximateLargeEventsToEmitCount.decrementAndGet();
approximateEventsToEmitCount.decrementAndGet();
}

View File

@ -65,6 +65,10 @@ public class EmitterTest
.accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Yay".getBytes(StandardCharsets.UTF_8)), true))
.build();
public static final Response BAD_RESPONSE = responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN)
.accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Not yay".getBytes(StandardCharsets.UTF_8)), true))
.build();
private static Response.ResponseBuilder responseBuilder(HttpVersion version, HttpResponseStatus status)
{
return new Response.ResponseBuilder()

View File

@ -20,6 +20,7 @@
package io.druid.java.util.emitter.core;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.java.util.common.Pair;
import org.junit.Assert;
import org.junit.Test;
@ -44,9 +45,12 @@ public class HttpEmitterConfigTest
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory()
);
Assert.assertEquals(batchConfigPair.lhs.intValue(), config.getMaxBatchSize());
Assert.assertEquals(batchConfigPair.rhs.intValue(), config.getBatchQueueSizeLimit());
Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut());
Assert.assertEquals(50, config.getBatchQueueSizeLimit());
Assert.assertEquals(2.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
Assert.assertEquals(0, config.getMinHttpTimeoutMillis());
}
@ -65,9 +69,12 @@ public class HttpEmitterConfigTest
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory()
);
Assert.assertEquals(batchConfigPair.lhs.intValue(), config.getMaxBatchSize());
Assert.assertEquals(batchConfigPair.rhs.intValue(), config.getBatchQueueSizeLimit());
Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut());
Assert.assertEquals(50, config.getBatchQueueSizeLimit());
Assert.assertEquals(2.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
Assert.assertEquals(0, config.getMinHttpTimeoutMillis());
}
@ -134,4 +141,32 @@ public class HttpEmitterConfigTest
Assert.assertEquals(3.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
Assert.assertEquals(100, config.getMinHttpTimeoutMillis());
}
@Test
public void testMemoryLimits()
{
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
64 * 1024 * 1024
);
Assert.assertEquals(3355443, batchConfigPair.lhs.intValue());
Assert.assertEquals(2, batchConfigPair.rhs.intValue());
Pair<Integer, Integer> batchConfigPair2 = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
128 * 1024 * 1024
);
Assert.assertEquals(5242880, batchConfigPair2.lhs.intValue());
Assert.assertEquals(2, batchConfigPair2.rhs.intValue());
Pair<Integer, Integer> batchConfigPair3 = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
256 * 1024 * 1024
);
Assert.assertEquals(5242880, batchConfigPair3.lhs.intValue());
Assert.assertEquals(5, batchConfigPair3.rhs.intValue());
Pair<Integer, Integer> batchConfigPair4 = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Long.MAX_VALUE
);
Assert.assertEquals(5242880, batchConfigPair4.lhs.intValue());
Assert.assertEquals(50, batchConfigPair4.rhs.intValue());
}
}

View File

@ -23,16 +23,19 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -140,4 +143,117 @@ public class HttpPostEmitterStressTest
}
}
}
@Test
public void testLargeEventsQueueLimit() throws InterruptedException, IOException
{
ObjectMapper mapper = new ObjectMapper();
HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
.setFlushMillis(100)
.setFlushCount(4)
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());
emitter.start();
httpClient.setGoHandler(new GoHandler() {
@Override
protected <X extends Exception> ListenableFuture<Response> go(Request request) throws X
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
}
});
char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);
Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.build("metric", 10)
.build("qwerty", "asdfgh");
for (int i = 0; i < 1000; i++) {
emitter.emit(bigEvent);
Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11);
}
emitter.flush();
}
@Test
public void testLargeAndSmallEventsQueueLimit() throws InterruptedException, IOException
{
HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
.setFlushMillis(100)
.setFlushCount(4)
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());
emitter.start();
httpClient.setGoHandler(new GoHandler() {
@Override
protected <X extends Exception> ListenableFuture<Response> go(Request request) throws X
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
}
});
char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);
Event smallEvent = ServiceMetricEvent.builder()
.setFeed("smallEvents")
.setDimension("test", "hi")
.build("metric", 10)
.build("qwerty", "asdfgh");
Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.build("metric", 10)
.build("qwerty", "asdfgh");
final CountDownLatch threadsCompleted = new CountDownLatch(2);
new Thread() {
@Override
public void run()
{
for (int i = 0; i < 1000; i++) {
emitter.emit(smallEvent);
Assert.assertTrue(emitter.getFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
threadsCompleted.countDown();
}
}.start();
new Thread() {
@Override
public void run()
{
for (int i = 0; i < 1000; i++) {
emitter.emit(bigEvent);
Assert.assertTrue(emitter.getFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
threadsCompleted.countDown();
}
}.start();
threadsCompleted.await();
emitter.flush();
}
}

View File

@ -20,6 +20,7 @@
package io.druid.java.util.emitter.core;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.java.util.common.Pair;
import org.junit.Assert;
import org.junit.Test;
@ -41,7 +42,11 @@ public class ParametrizedUriEmitterConfigTest
Assert.assertEquals("http://example.com/topic", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory()
);
Assert.assertEquals(batchConfigPair.lhs.intValue(), config.getMaxBatchSize());
Assert.assertEquals(batchConfigPair.rhs.intValue(), config.getBatchQueueSizeLimit());
Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut());
}