diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfig.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfig.java index 91afe071e23..bf1817fd5c5 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfig.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfig.java @@ -129,6 +129,12 @@ public class HttpEmitterConfig extends BaseHttpEmittingConfig return this; } + public Builder setMinHttpTimeoutMillis(int minHttpTimeoutMillis) + { + this.minHttpTimeoutMillis = minHttpTimeoutMillis; + return this; + } + public HttpEmitterConfig build() { return new HttpEmitterConfig(this, recipientBaseUrl); diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java index fd2387d977f..f70b7e7d3fd 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java @@ -693,6 +693,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter catch (Exception e) { if (e == timeoutLessThanMinimumException) { log.debug(e, "Failed to send events to url[%s] with timeout less than minimum", config.getRecipientBaseUrl()); + sendBackoffDelay(); } else { log.error(e, "Failed to send events to url[%s]", config.getRecipientBaseUrl()); } @@ -700,6 +701,34 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter } } + /** + * Back-off the sender for a period of time. + * + * Call only from the send() loop thread when it sees a timeout. + */ + void sendBackoffDelay() + { + // The backoff delay is quite simple in favour of something more complicated. + + // This is called when the send() call hits a timeoutLessThanMinimumException + // condition. Ideally, what we should do is delay until either the timeout increases + // to above the minHttpTimeout, and so the send() will be issued, or, until a Batch + // wants to flush based on the configured timeout. + + // Making this technically correct, across all possible Batches, including large events, + // etc is complicated in this codebase, since batch flush timing is not tracked here, + // so rather than introduce new bugs, let's keep it simple. + + final long backoffCheckDelayMillis = config.getMinHttpTimeoutMillis() / 5; + + try { + Thread.sleep(backoffCheckDelayMillis); + } + catch (InterruptedException ignored) { + return; + } + } + private void send(byte[] buffer, int length) throws Exception { long lastFillTimeMillis = HttpPostEmitter.this.lastBatchFillTimeMillis; diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java new file mode 100644 index 00000000000..2a1515e9331 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.testing.junit.LoggerCaptureRule; +import org.apache.logging.log4j.Level; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.Request; +import org.asynchttpclient.Response; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.util.Locale; +import java.util.concurrent.TimeoutException; + +public class HttpPostEmitterLoggerStressTest +{ + @Rule + public LoggerCaptureRule logCapture = new LoggerCaptureRule(HttpPostEmitter.class); + + private final MockHttpClient httpClient = new MockHttpClient(); + + @Test(timeout = 20_000L) + public void testBurstFollowedByQuietPeriod() throws InterruptedException, IOException + { + HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar") + .setFlushMillis(5000) + .setFlushCount(3) + .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS) + .setMaxBatchSize(1024 * 1024) + .setBatchQueueSizeLimit(10) + .setMinHttpTimeoutMillis(100) + .build(); + final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper()); + + emitter.start(); + + httpClient.setGoHandler(new GoHandler() { + @Override + protected ListenableFuture go(Request request) throws X + { + return GoHandlers.immediateFuture(EmitterTest.okResponse()); + } + }); + + Event smallEvent = ServiceMetricEvent.builder() + .setFeed("smallEvents") + .setDimension("test", "hi") + .build("metric", 10) + .build("qwerty", "asdfgh"); + + for (int i = 0; i < 1000; i++) { + emitter.emit(smallEvent); + + Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10); + Assert.assertTrue(emitter.getBuffersToEmit() <= 12); + } + + // by the end of this test, there should be no outstanding failed buffers + + // with a flush time of 5s, min timeout of 100ms, 20s should be + // easily enough to get through all of the events + + while (emitter.getTotalFailedBuffers() > 0) { + Thread.sleep(500); + } + + // there is also no reason to have too many log events + // refer to: https://github.com/apache/druid/issues/11279; + + long countOfTimeouts = logCapture.getLogEvents().stream() + .filter(ev -> ev.getLevel() == Level.DEBUG) + .filter(ev -> ev.getThrown() instanceof TimeoutException) + .count(); + + // 1000 events limit, implies we should have no more than + // 1000 rejected send events within the expected 20sec + // duration of the test + long limitTimeoutEvents = 1000; + + Assert.assertTrue( + String.format( + Locale.getDefault(), + "too many timeouts (%d), expect less than (%d)", + countOfTimeouts, + limitTimeoutEvents), + countOfTimeouts < limitTimeoutEvents); + + emitter.close(); + } +}