From 040263659847b03e8ad2c03dd560dc4d34c44f19 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 4 Mar 2016 18:53:44 -0600 Subject: [PATCH] configurable handoffConditionTimeout in realtime tasks for segment handoff wait --- docs/content/ingestion/stream-pull.md | 1 + .../druid/indexing/common/task/IndexTask.java | 4 +- .../common/task/RealtimeIndexTaskTest.java | 52 ++++++++++++- .../indexing/common/task/TaskSerdeTest.java | 3 +- .../indexing/overlord/TaskLifecycleTest.java | 1 + .../indexing/RealtimeTuningConfig.java | 26 ++++++- .../realtime/plumber/RealtimePlumber.java | 15 +++- .../indexing/RealtimeTuningConfigTest.java | 77 +++++++++++++++++++ .../segment/realtime/RealtimeManagerTest.java | 3 + .../plumber/RealtimePlumberSchoolTest.java | 3 +- .../segment/realtime/plumber/SinkTest.java | 1 + 11 files changed, 175 insertions(+), 11 deletions(-) diff --git a/docs/content/ingestion/stream-pull.md b/docs/content/ingestion/stream-pull.md index 1fed08d11c1..8785dcd427e 100644 --- a/docs/content/ingestion/stream-pull.md +++ b/docs/content/ingestion/stream-pull.md @@ -154,6 +154,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)| |mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)| |reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|false| +|handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It must be >= 0 and 0 means wait forerver.|0| Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`. diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 70ea5d7036e..3ba44f90596 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -48,7 +48,6 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.index.YeOldePlumberSchool; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; -import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IOConfig; @@ -144,7 +143,8 @@ public class IndexTask extends AbstractFixedIntervalTask buildV9Directly, 0, 0, - true + true, + null ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 9cefaa10822..85eb0a90418 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -281,6 +281,48 @@ public class RealtimeIndexTaskTest Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup()); } + + @Test(timeout = 60_000L, expected = ExecutionException.class) + public void testHandoffTimeout() throws Exception + { + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final RealtimeIndexTask task = makeRealtimeTask(null, true, 100L); + final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final ListenableFuture statusFuture = runTask(task, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + ImmutableList.of( + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "1") + ) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + while (mdc.getPublished().isEmpty()) { + Thread.sleep(50); + } + + Assert.assertEquals(1, task.getMetrics().processed()); + Assert.assertNotNull(Iterables.getOnlyElement(mdc.getPublished())); + + + // handoff would timeout, resulting in exception + statusFuture.get(); + } + @Test(timeout = 60_000L) public void testBasics() throws Exception { @@ -818,10 +860,15 @@ public class RealtimeIndexTaskTest private RealtimeIndexTask makeRealtimeTask(final String taskId) { - return makeRealtimeTask(taskId, true); + return makeRealtimeTask(taskId, true, 0); } private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions) + { + return makeRealtimeTask(taskId, reportParseExceptions, 0); + } + + private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions, long handoffTimeout) { ObjectMapper objectMapper = new DefaultObjectMapper(); DataSchema dataSchema = new DataSchema( @@ -849,7 +896,8 @@ public class RealtimeIndexTaskTest buildV9Directly, 0, 0, - reportParseExceptions + reportParseExceptions, + handoffTimeout ); return new RealtimeIndexTask( taskId, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 0c31d3df2ee..abc6fc8b4d9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -336,7 +336,8 @@ public class TaskSerdeTest null, 0, 0, - true + true, + null ) ), null diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index ff0e99f1383..f40b7357b1e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -1187,6 +1187,7 @@ public class TaskLifecycleTest null, 0, 0, + null, null ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 1e9e4adc999..8e62d2a9a6d 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -21,6 +21,7 @@ package io.druid.segment.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.io.Files; import io.druid.segment.IndexSpec; import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy; @@ -47,6 +48,7 @@ public class RealtimeTuningConfig implements TuningConfig private static final IndexSpec defaultIndexSpec = new IndexSpec(); private static final Boolean defaultBuildV9Directly = Boolean.FALSE; private static final Boolean defaultReportParseExceptions = Boolean.FALSE; + private static final long defaultHandoffConditionTimeout = 0; private static File createNewBasePersistDirectory() { @@ -69,7 +71,8 @@ public class RealtimeTuningConfig implements TuningConfig defaultBuildV9Directly, 0, 0, - defaultReportParseExceptions + defaultReportParseExceptions, + defaultHandoffConditionTimeout ); } @@ -86,6 +89,7 @@ public class RealtimeTuningConfig implements TuningConfig private final int persistThreadPriority; private final int mergeThreadPriority; private final boolean reportParseExceptions; + private final long handoffConditionTimeout; @JsonCreator public RealtimeTuningConfig( @@ -101,7 +105,8 @@ public class RealtimeTuningConfig implements TuningConfig @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("persistThreadPriority") int persistThreadPriority, @JsonProperty("mergeThreadPriority") int mergeThreadPriority, - @JsonProperty("reportParseExceptions") Boolean reportParseExceptions + @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -123,6 +128,11 @@ public class RealtimeTuningConfig implements TuningConfig this.reportParseExceptions = reportParseExceptions == null ? defaultReportParseExceptions : reportParseExceptions; + + this.handoffConditionTimeout = handoffConditionTimeout == null + ? defaultHandoffConditionTimeout + : handoffConditionTimeout; + Preconditions.checkArgument(this.handoffConditionTimeout >= 0, "handoffConditionTimeout must be >= 0"); } @JsonProperty @@ -203,6 +213,12 @@ public class RealtimeTuningConfig implements TuningConfig return reportParseExceptions; } + @JsonProperty + public long getHandoffConditionTimeout() + { + return handoffConditionTimeout; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -218,7 +234,8 @@ public class RealtimeTuningConfig implements TuningConfig buildV9Directly, persistThreadPriority, mergeThreadPriority, - reportParseExceptions + reportParseExceptions, + handoffConditionTimeout ); } @@ -237,7 +254,8 @@ public class RealtimeTuningConfig implements TuningConfig buildV9Directly, persistThreadPriority, mergeThreadPriority, - reportParseExceptions + reportParseExceptions, + handoffConditionTimeout ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index a4a00a4fdb8..ad049a36802 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -606,6 +606,7 @@ public class RealtimePlumber implements Plumber persistAndMerge(entry.getKey(), entry.getValue()); } + final long forceEndWaitTime = System.currentTimeMillis() + config.getHandoffConditionTimeout(); while (!sinks.isEmpty()) { try { log.info( @@ -627,7 +628,19 @@ public class RealtimePlumber implements Plumber synchronized (handoffCondition) { while (!sinks.isEmpty()) { - handoffCondition.wait(); + if (config.getHandoffConditionTimeout() == 0) { + handoffCondition.wait(); + } else { + long curr = System.currentTimeMillis(); + if (forceEndWaitTime - curr > 0) { + handoffCondition.wait(forceEndWaitTime - curr); + } else { + throw new ISE( + "Segment handoff wait timeout. [%s] segments might not have completed handoff.", + sinks.size() + ); + } + } } } } diff --git a/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java index 0d9bc837719..3254deac80b 100644 --- a/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -19,6 +19,11 @@ package io.druid.segment.indexing; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.segment.IndexSpec; +import io.druid.segment.TestHelper; +import io.druid.timeline.partition.NoneShardSpec; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; @@ -42,4 +47,76 @@ public class RealtimeTuningConfigTest ); Assert.assertEquals(new File("/tmp/nonexistent"), tuningConfig.getBasePersistDirectory()); } + + @Test + public void testSerdeWithDefaults() throws Exception + { + String jsonStr = "{\"type\":\"realtime\"}"; + + ObjectMapper mapper = TestHelper.getObjectMapper(); + RealtimeTuningConfig config = (RealtimeTuningConfig) mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + TuningConfig.class + ) + ), + TuningConfig.class + ); + + Assert.assertNotNull(config.getBasePersistDirectory()); + Assert.assertEquals(false, config.getBuildV9Directly()); + Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); + Assert.assertEquals(new NoneShardSpec(), config.getShardSpec()); + Assert.assertEquals(0, config.getMaxPendingPersists()); + Assert.assertEquals(75000, config.getMaxRowsInMemory()); + Assert.assertEquals(0, config.getMergeThreadPriority()); + Assert.assertEquals(0, config.getPersistThreadPriority()); + Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod()); + Assert.assertEquals(false, config.isReportParseExceptions()); + } + + @Test + public void testSerdeWithNonDefaults() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"realtime\",\n" + + " \"maxRowsInMemory\": 100,\n" + + " \"intermediatePersistPeriod\": \"PT1H\",\n" + + " \"windowPeriod\": \"PT1H\",\n" + + " \"basePersistDirectory\": \"/tmp/xxx\",\n" + + " \"maxPendingPersists\": 100,\n" + + " \"buildV9Directly\": true,\n" + + " \"persistThreadPriority\": 100,\n" + + " \"mergeThreadPriority\": 100,\n" + + " \"reportParseExceptions\": true,\n" + + " \"handoffConditionTimeout\": 100\n" + + "}"; + + ObjectMapper mapper = TestHelper.getObjectMapper(); + RealtimeTuningConfig config = (RealtimeTuningConfig) mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + TuningConfig.class + ) + ), + TuningConfig.class + ); + + Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString()); + Assert.assertEquals(true, config.getBuildV9Directly()); + Assert.assertEquals(100, config.getHandoffConditionTimeout()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); + Assert.assertEquals(new NoneShardSpec(), config.getShardSpec()); + Assert.assertEquals(100, config.getMaxPendingPersists()); + Assert.assertEquals(100, config.getMaxRowsInMemory()); + Assert.assertEquals(100, config.getMergeThreadPriority()); + Assert.assertEquals(100, config.getPersistThreadPriority()); + Assert.assertEquals(new Period("PT1H"), config.getWindowPeriod()); + Assert.assertEquals(true, config.isReportParseExceptions()); + } } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 2d0bcac50ed..e74dbbe9cbb 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -200,6 +200,7 @@ public class RealtimeManagerTest null, 0, 0, + null, null ); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); @@ -240,6 +241,7 @@ public class RealtimeManagerTest null, 0, 0, + null, null ); @@ -256,6 +258,7 @@ public class RealtimeManagerTest null, 0, 0, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 43dcf5e9041..2d3b0682dc8 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -197,7 +197,8 @@ public class RealtimePlumberSchoolTest buildV9Directly, 0, 0, - false + false, + null ); realtimePlumberSchool = new RealtimePlumberSchool( diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 980cbf77d99..5225752aeaf 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -70,6 +70,7 @@ public class SinkTest null, 0, 0, + null, null ); final Sink sink = new Sink(interval, schema, tuningConfig, version);