mirror of https://github.com/apache/druid.git
HttpPostEmitter back off send() busy-loop (#12102)
* HttpPostEmitter back off send() busy-loop The HttpPostEmitter gets in a loop until the flush timeout can be triggered, OR until some new events arrive that reset the minimum batch fill timeout delay. As a tactical fix, this introduces a simple backoff delay to the send loop to prevent spamming logs. * Update core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java Co-authored-by: Frank Chen <frankchen@apache.org> Co-authored-by: Frank Chen <frankchen@apache.org>
This commit is contained in:
parent
3e2bb4cf10
commit
0040042863
|
@ -129,6 +129,12 @@ public class HttpEmitterConfig extends BaseHttpEmittingConfig
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setMinHttpTimeoutMillis(int minHttpTimeoutMillis)
|
||||
{
|
||||
this.minHttpTimeoutMillis = minHttpTimeoutMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpEmitterConfig build()
|
||||
{
|
||||
return new HttpEmitterConfig(this, recipientBaseUrl);
|
||||
|
|
|
@ -693,6 +693,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
|
|||
catch (Exception e) {
|
||||
if (e == timeoutLessThanMinimumException) {
|
||||
log.debug(e, "Failed to send events to url[%s] with timeout less than minimum", config.getRecipientBaseUrl());
|
||||
sendBackoffDelay();
|
||||
} else {
|
||||
log.error(e, "Failed to send events to url[%s]", config.getRecipientBaseUrl());
|
||||
}
|
||||
|
@ -700,6 +701,34 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Back-off the sender for a period of time.
|
||||
*
|
||||
* Call only from the send() loop thread when it sees a timeout.
|
||||
*/
|
||||
void sendBackoffDelay()
|
||||
{
|
||||
// The backoff delay is quite simple in favour of something more complicated.
|
||||
|
||||
// This is called when the send() call hits a timeoutLessThanMinimumException
|
||||
// condition. Ideally, what we should do is delay until either the timeout increases
|
||||
// to above the minHttpTimeout, and so the send() will be issued, or, until a Batch
|
||||
// wants to flush based on the configured timeout.
|
||||
|
||||
// Making this technically correct, across all possible Batches, including large events,
|
||||
// etc is complicated in this codebase, since batch flush timing is not tracked here,
|
||||
// so rather than introduce new bugs, let's keep it simple.
|
||||
|
||||
final long backoffCheckDelayMillis = config.getMinHttpTimeoutMillis() / 5;
|
||||
|
||||
try {
|
||||
Thread.sleep(backoffCheckDelayMillis);
|
||||
}
|
||||
catch (InterruptedException ignored) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private void send(byte[] buffer, int length) throws Exception
|
||||
{
|
||||
long lastFillTimeMillis = HttpPostEmitter.this.lastBatchFillTimeMillis;
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.java.util.emitter.core;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.apache.druid.testing.junit.LoggerCaptureRule;
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.asynchttpclient.ListenableFuture;
|
||||
import org.asynchttpclient.Request;
|
||||
import org.asynchttpclient.Response;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class HttpPostEmitterLoggerStressTest
|
||||
{
|
||||
@Rule
|
||||
public LoggerCaptureRule logCapture = new LoggerCaptureRule(HttpPostEmitter.class);
|
||||
|
||||
private final MockHttpClient httpClient = new MockHttpClient();
|
||||
|
||||
@Test(timeout = 20_000L)
|
||||
public void testBurstFollowedByQuietPeriod() throws InterruptedException, IOException
|
||||
{
|
||||
HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
|
||||
.setFlushMillis(5000)
|
||||
.setFlushCount(3)
|
||||
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
|
||||
.setMaxBatchSize(1024 * 1024)
|
||||
.setBatchQueueSizeLimit(10)
|
||||
.setMinHttpTimeoutMillis(100)
|
||||
.build();
|
||||
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());
|
||||
|
||||
emitter.start();
|
||||
|
||||
httpClient.setGoHandler(new GoHandler() {
|
||||
@Override
|
||||
protected <X extends Exception> ListenableFuture<Response> go(Request request) throws X
|
||||
{
|
||||
return GoHandlers.immediateFuture(EmitterTest.okResponse());
|
||||
}
|
||||
});
|
||||
|
||||
Event smallEvent = ServiceMetricEvent.builder()
|
||||
.setFeed("smallEvents")
|
||||
.setDimension("test", "hi")
|
||||
.build("metric", 10)
|
||||
.build("qwerty", "asdfgh");
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
emitter.emit(smallEvent);
|
||||
|
||||
Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
|
||||
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
|
||||
}
|
||||
|
||||
// by the end of this test, there should be no outstanding failed buffers
|
||||
|
||||
// with a flush time of 5s, min timeout of 100ms, 20s should be
|
||||
// easily enough to get through all of the events
|
||||
|
||||
while (emitter.getTotalFailedBuffers() > 0) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
// there is also no reason to have too many log events
|
||||
// refer to: https://github.com/apache/druid/issues/11279;
|
||||
|
||||
long countOfTimeouts = logCapture.getLogEvents().stream()
|
||||
.filter(ev -> ev.getLevel() == Level.DEBUG)
|
||||
.filter(ev -> ev.getThrown() instanceof TimeoutException)
|
||||
.count();
|
||||
|
||||
// 1000 events limit, implies we should have no more than
|
||||
// 1000 rejected send events within the expected 20sec
|
||||
// duration of the test
|
||||
long limitTimeoutEvents = 1000;
|
||||
|
||||
Assert.assertTrue(
|
||||
String.format(
|
||||
Locale.getDefault(),
|
||||
"too many timeouts (%d), expect less than (%d)",
|
||||
countOfTimeouts,
|
||||
limitTimeoutEvents),
|
||||
countOfTimeouts < limitTimeoutEvents);
|
||||
|
||||
emitter.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue