configurable handoffConditionTimeout in realtime tasks for segment handoff wait

This commit is contained in:
Himanshu Gupta 2016-03-04 18:53:44 -06:00
parent 4fa08a1329
commit 0402636598
11 changed files with 175 additions and 11 deletions

View File

@ -154,6 +154,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|
|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|false|
|handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It must be >= 0 and 0 means wait forerver.|0|
Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`.

View File

@ -48,7 +48,6 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;
@ -144,7 +143,8 @@ public class IndexTask extends AbstractFixedIntervalTask
buildV9Directly,
0,
0,
true
true,
null
);
}

View File

@ -281,6 +281,48 @@ public class RealtimeIndexTaskTest
Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup());
}
@Test(timeout = 60_000L, expected = ExecutionException.class)
public void testHandoffTimeout() throws Exception
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final RealtimeIndexTask task = makeRealtimeTask(null, true, 100L);
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);
// Wait for firehose to show up, it starts off null.
while (task.getFirehose() == null) {
Thread.sleep(50);
}
final TestFirehose firehose = (TestFirehose) task.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "1")
)
)
);
// Stop the firehose, this will drain out existing events.
firehose.close();
// Wait for publish.
while (mdc.getPublished().isEmpty()) {
Thread.sleep(50);
}
Assert.assertEquals(1, task.getMetrics().processed());
Assert.assertNotNull(Iterables.getOnlyElement(mdc.getPublished()));
// handoff would timeout, resulting in exception
statusFuture.get();
}
@Test(timeout = 60_000L)
public void testBasics() throws Exception
{
@ -818,10 +860,15 @@ public class RealtimeIndexTaskTest
private RealtimeIndexTask makeRealtimeTask(final String taskId)
{
return makeRealtimeTask(taskId, true);
return makeRealtimeTask(taskId, true, 0);
}
private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions)
{
return makeRealtimeTask(taskId, reportParseExceptions, 0);
}
private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions, long handoffTimeout)
{
ObjectMapper objectMapper = new DefaultObjectMapper();
DataSchema dataSchema = new DataSchema(
@ -849,7 +896,8 @@ public class RealtimeIndexTaskTest
buildV9Directly,
0,
0,
reportParseExceptions
reportParseExceptions,
handoffTimeout
);
return new RealtimeIndexTask(
taskId,

View File

@ -336,7 +336,8 @@ public class TaskSerdeTest
null,
0,
0,
true
true,
null
)
),
null

View File

@ -1187,6 +1187,7 @@ public class TaskLifecycleTest
null,
0,
0,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);

View File

@ -21,6 +21,7 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import io.druid.segment.IndexSpec;
import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy;
@ -47,6 +48,7 @@ public class RealtimeTuningConfig implements TuningConfig
private static final IndexSpec defaultIndexSpec = new IndexSpec();
private static final Boolean defaultBuildV9Directly = Boolean.FALSE;
private static final Boolean defaultReportParseExceptions = Boolean.FALSE;
private static final long defaultHandoffConditionTimeout = 0;
private static File createNewBasePersistDirectory()
{
@ -69,7 +71,8 @@ public class RealtimeTuningConfig implements TuningConfig
defaultBuildV9Directly,
0,
0,
defaultReportParseExceptions
defaultReportParseExceptions,
defaultHandoffConditionTimeout
);
}
@ -86,6 +89,7 @@ public class RealtimeTuningConfig implements TuningConfig
private final int persistThreadPriority;
private final int mergeThreadPriority;
private final boolean reportParseExceptions;
private final long handoffConditionTimeout;
@JsonCreator
public RealtimeTuningConfig(
@ -101,7 +105,8 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("persistThreadPriority") int persistThreadPriority,
@JsonProperty("mergeThreadPriority") int mergeThreadPriority,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -123,6 +128,11 @@ public class RealtimeTuningConfig implements TuningConfig
this.reportParseExceptions = reportParseExceptions == null
? defaultReportParseExceptions
: reportParseExceptions;
this.handoffConditionTimeout = handoffConditionTimeout == null
? defaultHandoffConditionTimeout
: handoffConditionTimeout;
Preconditions.checkArgument(this.handoffConditionTimeout >= 0, "handoffConditionTimeout must be >= 0");
}
@JsonProperty
@ -203,6 +213,12 @@ public class RealtimeTuningConfig implements TuningConfig
return reportParseExceptions;
}
@JsonProperty
public long getHandoffConditionTimeout()
{
return handoffConditionTimeout;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
@ -218,7 +234,8 @@ public class RealtimeTuningConfig implements TuningConfig
buildV9Directly,
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions
reportParseExceptions,
handoffConditionTimeout
);
}
@ -237,7 +254,8 @@ public class RealtimeTuningConfig implements TuningConfig
buildV9Directly,
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions
reportParseExceptions,
handoffConditionTimeout
);
}
}

View File

@ -606,6 +606,7 @@ public class RealtimePlumber implements Plumber
persistAndMerge(entry.getKey(), entry.getValue());
}
final long forceEndWaitTime = System.currentTimeMillis() + config.getHandoffConditionTimeout();
while (!sinks.isEmpty()) {
try {
log.info(
@ -627,7 +628,19 @@ public class RealtimePlumber implements Plumber
synchronized (handoffCondition) {
while (!sinks.isEmpty()) {
handoffCondition.wait();
if (config.getHandoffConditionTimeout() == 0) {
handoffCondition.wait();
} else {
long curr = System.currentTimeMillis();
if (forceEndWaitTime - curr > 0) {
handoffCondition.wait(forceEndWaitTime - curr);
} else {
throw new ISE(
"Segment handoff wait timeout. [%s] segments might not have completed handoff.",
sinks.size()
);
}
}
}
}
}

View File

@ -19,6 +19,11 @@
package io.druid.segment.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.IndexSpec;
import io.druid.segment.TestHelper;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
@ -42,4 +47,76 @@ public class RealtimeTuningConfigTest
);
Assert.assertEquals(new File("/tmp/nonexistent"), tuningConfig.getBasePersistDirectory());
}
@Test
public void testSerdeWithDefaults() throws Exception
{
String jsonStr = "{\"type\":\"realtime\"}";
ObjectMapper mapper = TestHelper.getObjectMapper();
RealtimeTuningConfig config = (RealtimeTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(false, config.getBuildV9Directly());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(new NoneShardSpec(), config.getShardSpec());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(75000, config.getMaxRowsInMemory());
Assert.assertEquals(0, config.getMergeThreadPriority());
Assert.assertEquals(0, config.getPersistThreadPriority());
Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod());
Assert.assertEquals(false, config.isReportParseExceptions());
}
@Test
public void testSerdeWithNonDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"realtime\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"windowPeriod\": \"PT1H\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"buildV9Directly\": true,\n"
+ " \"persistThreadPriority\": 100,\n"
+ " \"mergeThreadPriority\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100\n"
+ "}";
ObjectMapper mapper = TestHelper.getObjectMapper();
RealtimeTuningConfig config = (RealtimeTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);
Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
Assert.assertEquals(true, config.getBuildV9Directly());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(new NoneShardSpec(), config.getShardSpec());
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMergeThreadPriority());
Assert.assertEquals(100, config.getPersistThreadPriority());
Assert.assertEquals(new Period("PT1H"), config.getWindowPeriod());
Assert.assertEquals(true, config.isReportParseExceptions());
}
}

View File

@ -200,6 +200,7 @@ public class RealtimeManagerTest
null,
0,
0,
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));
@ -240,6 +241,7 @@ public class RealtimeManagerTest
null,
0,
0,
null,
null
);
@ -256,6 +258,7 @@ public class RealtimeManagerTest
null,
0,
0,
null,
null
);

View File

@ -197,7 +197,8 @@ public class RealtimePlumberSchoolTest
buildV9Directly,
0,
0,
false
false,
null
);
realtimePlumberSchool = new RealtimePlumberSchool(

View File

@ -70,6 +70,7 @@ public class SinkTest
null,
0,
0,
null,
null
);
final Sink sink = new Sink(interval, schema, tuningConfig, version);