RealtimeIndexTask to support alertTimeout in context (#4089)

* RealtimeIndexTask to support alertTimeout in context and raise alert if task process exists after the timeout

* move alertTimeout config to tuningConfig and document
This commit is contained in:
Himanshu 2017-03-24 14:48:12 -05:00 committed by Fangjin Yang
parent b4289c0004
commit de081c711b
14 changed files with 63 additions and 6 deletions

View File

@ -155,6 +155,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)|
|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|no (default == false)|
|handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|alertTimeout|long|Milliseconds timeout after which an alert is created if the task isn't finished by then. This allows users to monitor tasks that are failing to finish and give up the worker slot for any unexpected errors.|no (default == 0)|
|indexSpec|Object|Tune how data is indexed. See below for more information.|no|
Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`.

View File

@ -70,6 +70,8 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
public class RealtimeIndexTask extends AbstractTask
@ -189,6 +191,8 @@ public class RealtimeIndexTask extends AbstractTask
throw new IllegalStateException("WTF?!? run with non-null plumber??!");
}
setupTimeoutAlert();
boolean normalExit = true;
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
@ -518,4 +522,26 @@ public class RealtimeIndexTask extends AbstractTask
taskToolbox.publishSegments(ImmutableList.of(segment));
}
}
private void setupTimeoutAlert()
{
if (spec.getTuningConfig().getAlertTimeout() > 0) {
Timer timer = new Timer("RealtimeIndexTask-Timer", true);
timer.schedule(
new TimerTask()
{
@Override
public void run()
{
log.makeAlert(
"RealtimeIndexTask for dataSource [%s] hasn't finished in configured time [%d] ms.",
spec.getDataSchema().getDataSource(),
spec.getTuningConfig().getAlertTimeout()
).emit();
}
},
spec.getTuningConfig().getAlertTimeout()
);
}
}
}

View File

@ -910,7 +910,8 @@ public class RealtimeIndexTaskTest
0,
0,
reportParseExceptions,
handoffTimeout
handoffTimeout,
null
);
return new RealtimeIndexTask(
taskId,

View File

@ -503,6 +503,7 @@ public class TaskSerdeTest
0,
0,
true,
null,
null
)
),

View File

@ -1191,6 +1191,7 @@ public class TaskLifecycleTest
0,
0,
null,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);

View File

@ -50,6 +50,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
private static final Boolean defaultBuildV9Directly = Boolean.TRUE;
private static final Boolean defaultReportParseExceptions = Boolean.FALSE;
private static final long defaultHandoffConditionTimeout = 0;
private static final long defaultAlertTimeout = 0;
private static File createNewBasePersistDirectory()
{
@ -73,7 +74,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
0,
0,
defaultReportParseExceptions,
defaultHandoffConditionTimeout
defaultHandoffConditionTimeout,
defaultAlertTimeout
);
}
@ -91,6 +93,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
private final int mergeThreadPriority;
private final boolean reportParseExceptions;
private final long handoffConditionTimeout;
private final long alertTimeout;
@JsonCreator
public RealtimeTuningConfig(
@ -107,7 +110,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
@JsonProperty("persistThreadPriority") int persistThreadPriority,
@JsonProperty("mergeThreadPriority") int mergeThreadPriority,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("alertTimeout") Long alertTimeout
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -134,6 +138,9 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
? defaultHandoffConditionTimeout
: handoffConditionTimeout;
Preconditions.checkArgument(this.handoffConditionTimeout >= 0, "handoffConditionTimeout must be >= 0");
this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout;
Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0");
}
@JsonProperty
@ -220,6 +227,12 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
return handoffConditionTimeout;
}
@JsonProperty
public long getAlertTimeout()
{
return alertTimeout;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
@ -236,7 +249,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions,
handoffConditionTimeout
handoffConditionTimeout,
alertTimeout
);
}
@ -256,7 +270,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions,
handoffConditionTimeout
handoffConditionTimeout,
alertTimeout
);
}
}

View File

@ -67,6 +67,7 @@ public class RealtimeTuningConfigTest
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(true, config.getBuildV9Directly());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(0, config.getAlertTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(NoneShardSpec.instance(), config.getShardSpec());
@ -92,7 +93,8 @@ public class RealtimeTuningConfigTest
+ " \"persistThreadPriority\": 100,\n"
+ " \"mergeThreadPriority\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"alertTimeout\": 70\n"
+ "}";
ObjectMapper mapper = TestHelper.getObjectMapper();
@ -109,6 +111,7 @@ public class RealtimeTuningConfigTest
Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
Assert.assertEquals(false, config.getBuildV9Directly());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(70, config.getAlertTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(NoneShardSpec.instance(), config.getShardSpec());

View File

@ -195,6 +195,7 @@ public class RealtimeManagerTest
0,
0,
null,
null,
null
);
plumber = new TestPlumber(new Sink(
@ -250,6 +251,7 @@ public class RealtimeManagerTest
0,
0,
null,
null,
null
);
@ -267,6 +269,7 @@ public class RealtimeManagerTest
0,
0,
null,
null,
null
);

View File

@ -80,6 +80,7 @@ public class AppenderatorPlumberTest
0,
0,
false,
null,
null
);

View File

@ -134,6 +134,7 @@ public class AppenderatorTester implements AutoCloseable
0,
0,
null,
null,
null
);

View File

@ -143,6 +143,7 @@ public class DefaultOfflineAppenderatorFactoryTest
0,
0,
null,
null,
null
);

View File

@ -203,6 +203,7 @@ public class RealtimePlumberSchoolTest
0,
0,
false,
null,
null
);

View File

@ -70,6 +70,7 @@ public class SinkTest
0,
0,
null,
null,
null
);
final Sink sink = new Sink(

View File

@ -180,6 +180,7 @@ public class DruidJsonValidatorTest
0,
0,
true,
null,
null
)
),