diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index b0796ace573..5aa9f16af6d 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -222,8 +222,8 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts |`druid.emitter.http.basicAuthentication`|Login and password for authentification in "login:password" form, e. g. `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentification| |`druid.emitter.http.flushTimeOut`|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout| |`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY| -|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|5191680 (i. e. 5 MB)| -|`druid.emitter.http.batchQueueSizeLimit`|The maximum number of batches in emitter queue, if there are problems with emitting.|50| +|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|the minimum of (10% of JVM heap size divided by 2) or (5191680 (i. e. 5 MB))| +|`druid.emitter.http.batchQueueSizeLimit`|The maximum number of batches in emitter queue, if there are problems with emitting.|the maximum of (2) or (10% of the JVM heap size divided by 5MB)| |`druid.emitter.http.minHttpTimeoutMillis`|If the speed of filling batches imposes timeout smaller than that, not even trying to send batch to endpoint, because it will likely fail, not being able to send the data that fast. Configure this depending based on emitter/successfulSending/minTimeMs metric. Reasonable values are 10ms..100ms.|0| |`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none, required config| diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/BaseHttpEmittingConfig.java b/java-util/src/main/java/io/druid/java/util/emitter/core/BaseHttpEmittingConfig.java index 7078a3bf679..9838bfb1903 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/BaseHttpEmittingConfig.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/core/BaseHttpEmittingConfig.java @@ -20,6 +20,7 @@ package io.druid.java.util.emitter.core; import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.java.util.common.Pair; import javax.validation.constraints.Min; @@ -27,7 +28,16 @@ public class BaseHttpEmittingConfig { public static final long DEFAULT_FLUSH_MILLIS = 60 * 1000; public static final int DEFAULT_FLUSH_COUNTS = 500; - public static final int DEFAULT_MAX_BATCH_SIZE = 5 * 1024 * 1024; + + /** ensure the event buffers don't use more than 10% of memory by default */ + public static final int DEFAULT_MAX_BATCH_SIZE; + public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT; + static { + Pair batchConfigPair = getDefaultBatchSizeAndLimit(Runtime.getRuntime().maxMemory()); + DEFAULT_MAX_BATCH_SIZE = batchConfigPair.lhs; + DEFAULT_BATCH_QUEUE_SIZE_LIMIT = batchConfigPair.rhs; + } + /** * Do not time out in case flushTimeOut is not set */ @@ -35,13 +45,31 @@ public class BaseHttpEmittingConfig public static final String DEFAULT_BASIC_AUTHENTICATION = null; public static final BatchingStrategy DEFAULT_BATCHING_STRATEGY = BatchingStrategy.ARRAY; public static final ContentEncoding DEFAULT_CONTENT_ENCODING = null; - public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT = 50; public static final float DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR = 2.0f; /** * The default value effective doesn't set the min timeout */ public static final int DEFAULT_MIN_HTTP_TIMEOUT_MILLIS = 0; + public static Pair getDefaultBatchSizeAndLimit(long maxMemory) + { + long memoryLimit = maxMemory / 10; + long batchSize = 5 * 1024 * 1024; + long queueLimit = 50; + + if (batchSize * queueLimit > memoryLimit) { + queueLimit = memoryLimit / batchSize; + } + + // make room for at least two queue items + if (queueLimit < 2) { + queueLimit = 2; + batchSize = memoryLimit / queueLimit; + } + + return new Pair<>((int) batchSize, (int) queueLimit); + } + @Min(1) @JsonProperty long flushMillis = DEFAULT_FLUSH_MILLIS; diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java index 008386cc09f..20f29443da4 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java @@ -279,6 +279,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter ); } else { largeEventsToEmit.add(eventBytes); + approximateBuffersToEmitCount.incrementAndGet(); approximateLargeEventsToEmitCount.incrementAndGet(); approximateEventsToEmitCount.incrementAndGet(); } @@ -553,6 +554,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter largeEventsToEmit.add(LARGE_EVENTS_STOP); for (byte[] largeEvent; (largeEvent = largeEventsToEmit.poll()) != LARGE_EVENTS_STOP; ) { emitLargeEvent(largeEvent); + approximateBuffersToEmitCount.decrementAndGet(); approximateLargeEventsToEmitCount.decrementAndGet(); approximateEventsToEmitCount.decrementAndGet(); } diff --git a/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java b/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java index b4eca7ab827..ac537fb4b19 100644 --- a/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java +++ b/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java @@ -65,6 +65,10 @@ public class EmitterTest .accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Yay".getBytes(StandardCharsets.UTF_8)), true)) .build(); + public static final Response BAD_RESPONSE = responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN) + .accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Not yay".getBytes(StandardCharsets.UTF_8)), true)) + .build(); + private static Response.ResponseBuilder responseBuilder(HttpVersion version, HttpResponseStatus status) { return new Response.ResponseBuilder() diff --git a/java-util/src/test/java/io/druid/java/util/emitter/core/HttpEmitterConfigTest.java b/java-util/src/test/java/io/druid/java/util/emitter/core/HttpEmitterConfigTest.java index a80740f9eb4..b94f92accda 100644 --- a/java-util/src/test/java/io/druid/java/util/emitter/core/HttpEmitterConfigTest.java +++ b/java-util/src/test/java/io/druid/java/util/emitter/core/HttpEmitterConfigTest.java @@ -20,6 +20,7 @@ package io.druid.java.util.emitter.core; import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.java.util.common.Pair; import org.junit.Assert; import org.junit.Test; @@ -44,9 +45,12 @@ public class HttpEmitterConfigTest Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl()); Assert.assertEquals(null, config.getBasicAuthentication()); Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy()); - Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize()); + Pair batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( + Runtime.getRuntime().maxMemory() + ); + Assert.assertEquals(batchConfigPair.lhs.intValue(), config.getMaxBatchSize()); + Assert.assertEquals(batchConfigPair.rhs.intValue(), config.getBatchQueueSizeLimit()); Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut()); - Assert.assertEquals(50, config.getBatchQueueSizeLimit()); Assert.assertEquals(2.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f); Assert.assertEquals(0, config.getMinHttpTimeoutMillis()); } @@ -65,9 +69,12 @@ public class HttpEmitterConfigTest Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl()); Assert.assertEquals(null, config.getBasicAuthentication()); Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy()); - Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize()); + Pair batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( + Runtime.getRuntime().maxMemory() + ); + Assert.assertEquals(batchConfigPair.lhs.intValue(), config.getMaxBatchSize()); + Assert.assertEquals(batchConfigPair.rhs.intValue(), config.getBatchQueueSizeLimit()); Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut()); - Assert.assertEquals(50, config.getBatchQueueSizeLimit()); Assert.assertEquals(2.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f); Assert.assertEquals(0, config.getMinHttpTimeoutMillis()); } @@ -134,4 +141,32 @@ public class HttpEmitterConfigTest Assert.assertEquals(3.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f); Assert.assertEquals(100, config.getMinHttpTimeoutMillis()); } + + @Test + public void testMemoryLimits() + { + Pair batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( + 64 * 1024 * 1024 + ); + Assert.assertEquals(3355443, batchConfigPair.lhs.intValue()); + Assert.assertEquals(2, batchConfigPair.rhs.intValue()); + + Pair batchConfigPair2 = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( + 128 * 1024 * 1024 + ); + Assert.assertEquals(5242880, batchConfigPair2.lhs.intValue()); + Assert.assertEquals(2, batchConfigPair2.rhs.intValue()); + + Pair batchConfigPair3 = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( + 256 * 1024 * 1024 + ); + Assert.assertEquals(5242880, batchConfigPair3.lhs.intValue()); + Assert.assertEquals(5, batchConfigPair3.rhs.intValue()); + + Pair batchConfigPair4 = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( + Long.MAX_VALUE + ); + Assert.assertEquals(5242880, batchConfigPair4.lhs.intValue()); + Assert.assertEquals(50, batchConfigPair4.rhs.intValue()); + } } diff --git a/java-util/src/test/java/io/druid/java/util/emitter/core/HttpPostEmitterStressTest.java b/java-util/src/test/java/io/druid/java/util/emitter/core/HttpPostEmitterStressTest.java index 06ad38c9751..a467d366a00 100644 --- a/java-util/src/test/java/io/druid/java/util/emitter/core/HttpPostEmitterStressTest.java +++ b/java-util/src/test/java/io/druid/java/util/emitter/core/HttpPostEmitterStressTest.java @@ -23,16 +23,19 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Futures; +import io.druid.java.util.emitter.service.ServiceMetricEvent; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.Request; import org.asynchttpclient.Response; +import org.junit.Assert; import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -140,4 +143,117 @@ public class HttpPostEmitterStressTest } } } + + @Test + public void testLargeEventsQueueLimit() throws InterruptedException, IOException + { + ObjectMapper mapper = new ObjectMapper(); + + HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar") + .setFlushMillis(100) + .setFlushCount(4) + .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS) + .setMaxBatchSize(1024 * 1024) + .setBatchQueueSizeLimit(10) + .build(); + final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper()); + + emitter.start(); + + httpClient.setGoHandler(new GoHandler() { + @Override + protected ListenableFuture go(Request request) throws X + { + return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE); + } + }); + + char[] chars = new char[600000]; + Arrays.fill(chars, '*'); + String bigString = new String(chars); + + Event bigEvent = ServiceMetricEvent.builder() + .setFeed("bigEvents") + .setDimension("test", bigString) + .build("metric", 10) + .build("qwerty", "asdfgh"); + + for (int i = 0; i < 1000; i++) { + emitter.emit(bigEvent); + Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11); + } + + emitter.flush(); + } + + @Test + public void testLargeAndSmallEventsQueueLimit() throws InterruptedException, IOException + { + HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar") + .setFlushMillis(100) + .setFlushCount(4) + .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS) + .setMaxBatchSize(1024 * 1024) + .setBatchQueueSizeLimit(10) + .build(); + final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper()); + + emitter.start(); + + httpClient.setGoHandler(new GoHandler() { + @Override + protected ListenableFuture go(Request request) throws X + { + return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE); + } + }); + + char[] chars = new char[600000]; + Arrays.fill(chars, '*'); + String bigString = new String(chars); + + Event smallEvent = ServiceMetricEvent.builder() + .setFeed("smallEvents") + .setDimension("test", "hi") + .build("metric", 10) + .build("qwerty", "asdfgh"); + + Event bigEvent = ServiceMetricEvent.builder() + .setFeed("bigEvents") + .setDimension("test", bigString) + .build("metric", 10) + .build("qwerty", "asdfgh"); + + final CountDownLatch threadsCompleted = new CountDownLatch(2); + new Thread() { + @Override + public void run() + { + for (int i = 0; i < 1000; i++) { + + emitter.emit(smallEvent); + + Assert.assertTrue(emitter.getFailedBuffers() <= 10); + Assert.assertTrue(emitter.getBuffersToEmit() <= 12); + } + threadsCompleted.countDown(); + } + }.start(); + new Thread() { + @Override + public void run() + { + for (int i = 0; i < 1000; i++) { + + emitter.emit(bigEvent); + + Assert.assertTrue(emitter.getFailedBuffers() <= 10); + Assert.assertTrue(emitter.getBuffersToEmit() <= 12); + } + threadsCompleted.countDown(); + } + }.start(); + threadsCompleted.await(); + emitter.flush(); + } } diff --git a/java-util/src/test/java/io/druid/java/util/emitter/core/ParametrizedUriEmitterConfigTest.java b/java-util/src/test/java/io/druid/java/util/emitter/core/ParametrizedUriEmitterConfigTest.java index 63f7c63a19d..c860483a157 100644 --- a/java-util/src/test/java/io/druid/java/util/emitter/core/ParametrizedUriEmitterConfigTest.java +++ b/java-util/src/test/java/io/druid/java/util/emitter/core/ParametrizedUriEmitterConfigTest.java @@ -20,6 +20,7 @@ package io.druid.java.util.emitter.core; import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.java.util.common.Pair; import org.junit.Assert; import org.junit.Test; @@ -41,7 +42,11 @@ public class ParametrizedUriEmitterConfigTest Assert.assertEquals("http://example.com/topic", config.getRecipientBaseUrl()); Assert.assertEquals(null, config.getBasicAuthentication()); Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy()); - Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize()); + Pair batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( + Runtime.getRuntime().maxMemory() + ); + Assert.assertEquals(batchConfigPair.lhs.intValue(), config.getMaxBatchSize()); + Assert.assertEquals(batchConfigPair.rhs.intValue(), config.getBatchQueueSizeLimit()); Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut()); }