mirror of
https://github.com/apache/druid.git
synced 2025-03-08 10:30:38 +00:00
more flush timeout for emitter tests (#8991)
* more flush timeout for emitter tests * share constant
This commit is contained in:
parent
c949a25210
commit
ca2a7a1f08
@ -25,6 +25,7 @@ import org.apache.druid.metadata.PasswordProvider;
|
||||
import org.apache.druid.utils.JvmUtils;
|
||||
|
||||
import javax.validation.constraints.Min;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class BaseHttpEmittingConfig
|
||||
{
|
||||
@ -35,6 +36,8 @@ public class BaseHttpEmittingConfig
|
||||
public static final int DEFAULT_MAX_BATCH_SIZE;
|
||||
public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT;
|
||||
|
||||
public static final long TEST_FLUSH_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS);
|
||||
|
||||
static {
|
||||
Pair<Integer, Integer> batchConfigPair =
|
||||
getDefaultBatchSizeAndLimit(JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes());
|
||||
|
@ -52,7 +52,6 @@ import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -118,7 +117,7 @@ public class EmitterTest
|
||||
{
|
||||
HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL)
|
||||
.setFlushMillis(timeInMillis)
|
||||
.setFlushTimeout(TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS))
|
||||
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
.setFlushCount(Integer.MAX_VALUE)
|
||||
.build();
|
||||
HttpPostEmitter emitter = new HttpPostEmitter(
|
||||
@ -134,7 +133,7 @@ public class EmitterTest
|
||||
{
|
||||
HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL)
|
||||
.setFlushMillis(Long.MAX_VALUE)
|
||||
.setFlushTimeout(TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS))
|
||||
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
.setFlushCount(size)
|
||||
.build();
|
||||
HttpPostEmitter emitter = new HttpPostEmitter(
|
||||
@ -153,6 +152,10 @@ public class EmitterTest
|
||||
props.setProperty("org.apache.druid.java.util.emitter.recipientBaseUrl", TARGET_URL);
|
||||
props.setProperty("org.apache.druid.java.util.emitter.flushMillis", String.valueOf(Long.MAX_VALUE));
|
||||
props.setProperty("org.apache.druid.java.util.emitter.flushCount", String.valueOf(size));
|
||||
props.setProperty(
|
||||
"org.apache.druid.java.util.emitter.flushTimeOut",
|
||||
String.valueOf(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
);
|
||||
|
||||
Lifecycle lifecycle = new Lifecycle();
|
||||
Emitter emitter = Emitters.create(props, httpClient, JSON_MAPPER, lifecycle);
|
||||
@ -170,6 +173,7 @@ public class EmitterTest
|
||||
.setFlushMillis(Long.MAX_VALUE)
|
||||
.setFlushCount(size)
|
||||
.setContentEncoding(encoding)
|
||||
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
.build();
|
||||
HttpPostEmitter emitter = new HttpPostEmitter(
|
||||
config,
|
||||
@ -185,6 +189,7 @@ public class EmitterTest
|
||||
HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL)
|
||||
.setFlushMillis(Long.MAX_VALUE)
|
||||
.setFlushCount(Integer.MAX_VALUE)
|
||||
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
.setBasicAuthentication(authentication)
|
||||
.setBatchingStrategy(BatchingStrategy.NEWLINES)
|
||||
.setMaxBatchSize(1024 * 1024)
|
||||
@ -203,6 +208,7 @@ public class EmitterTest
|
||||
HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL)
|
||||
.setFlushMillis(Long.MAX_VALUE)
|
||||
.setFlushCount(Integer.MAX_VALUE)
|
||||
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
.setMaxBatchSize(batchSize)
|
||||
.build();
|
||||
HttpPostEmitter emitter = new HttpPostEmitter(
|
||||
|
@ -70,6 +70,7 @@ public class HttpEmitterTest
|
||||
final HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
|
||||
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
|
||||
.setHttpTimeoutAllowanceFactor(timeoutAllowanceFactor)
|
||||
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
.build();
|
||||
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER);
|
||||
|
||||
|
@ -59,6 +59,7 @@ public class HttpPostEmitterStressTest
|
||||
HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
|
||||
.setFlushMillis(100)
|
||||
.setFlushCount(4)
|
||||
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
|
||||
.setMaxBatchSize(1024 * 1024)
|
||||
// For this test, we don't need any batches to be dropped, i. e. "gaps" in data
|
||||
|
@ -68,6 +68,7 @@ public class HttpPostEmitterTest
|
||||
HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
|
||||
.setFlushMillis(100)
|
||||
.setFlushCount(4)
|
||||
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
|
||||
.setMaxBatchSize(1024 * 1024)
|
||||
.setBatchQueueSizeLimit(1000)
|
||||
@ -94,5 +95,4 @@ public class HttpPostEmitterTest
|
||||
|
||||
Assert.assertEquals(2, emitter.getTotalEmittedEvents());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -66,6 +66,10 @@ public class ParametrizedUriEmitterTest
|
||||
final Properties props = new Properties();
|
||||
props.setProperty("org.apache.druid.java.util.emitter.type", "parametrized");
|
||||
props.setProperty("org.apache.druid.java.util.emitter.recipientBaseUrlPattern", uriPattern);
|
||||
props.setProperty(
|
||||
"org.apache.druid.java.util.emitter.httpEmitting.flushTimeOut",
|
||||
String.valueOf(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
|
||||
);
|
||||
lifecycle = new Lifecycle();
|
||||
Emitter emitter = Emitters.create(props, httpClient, lifecycle);
|
||||
Assert.assertEquals(ParametrizedUriEmitter.class, emitter.getClass());
|
||||
|
Loading…
x
Reference in New Issue
Block a user