Merge pull request #1777 from pjain1/fail_realtime

fail task if finishjob throws any exception
This commit is contained in:
Gian Merlino 2015-09-25 09:00:02 -07:00
commit 696f5336e0
2 changed files with 138 additions and 59 deletions

View File

@ -55,12 +55,12 @@ import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
public class RealtimeIndexTask extends AbstractTask
@ -337,6 +337,7 @@ public class RealtimeIndexTask extends AbstractTask
}
catch (Exception e) {
log.makeAlert(e, "Failed to finish realtime task").emit();
throw e;
}
finally {
// firehose will be non-null since normalExit is true

View File

@ -94,6 +94,7 @@ import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
@ -112,6 +113,7 @@ import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@ -181,10 +183,13 @@ public class TaskLifecycleTest
private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private FilteredServerView serverView;
private MonitorScheduler monitorScheduler;
private ServiceEmitter emitter;
private TaskQueueConfig tqc;
private int pushedSegments;
private int announcedSinks;
private static CountDownLatch publishCountDown;
private TestDerbyConnector testDerbyConnector;
private List<ServerView.SegmentCallback> segmentCallbacks = new ArrayList<>();
private static MockIndexerMetadataStorageCoordinator newMockMDC()
{
@ -320,17 +325,16 @@ public class TaskLifecycleTest
@Before
public void setUp() throws Exception
{
final ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
queryRunnerFactoryConglomerate = EasyMock.createStrictMock(QueryRunnerFactoryConglomerate.class);
serverView = EasyMock.createStrictMock(FilteredServerView.class);
monitorScheduler = EasyMock.createStrictMock(MonitorScheduler.class);
publishCountDown = new CountDownLatch(1);
announcedSinks = 0;
pushedSegments = 0;
tmpDir = temporaryFolder.newFolder();
final TaskQueueConfig tqc = new DefaultObjectMapper().readValue(
tqc = new DefaultObjectMapper().readValue(
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}",
TaskQueueConfig.class
);
@ -361,14 +365,17 @@ public class TaskLifecycleTest
throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType));
}
tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts);
mdc = newMockMDC();
tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter()));
tb = new TaskToolboxFactory(
new TaskConfig(tmpDir.toString(), null, null, 50000, null),
tac,
newMockEmitter(),
serverView = new FilteredServerView()
{
@Override
public void registerSegmentCallback(
Executor exec, ServerView.SegmentCallback callback, Predicate<DataSegment> filter
)
{
segmentCallbacks.add(callback);
}
};
setUpAndStartTaskQueue(
new DataSegmentPusher()
{
@Override
@ -383,7 +390,20 @@ public class TaskLifecycleTest
pushedSegments++;
return segment;
}
},
}
);
}
private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) {
tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts);
mdc = newMockMDC();
tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter()));
tb = new TaskToolboxFactory(
new TaskConfig(tmpDir.toString(), null, null, 50000, null),
tac,
newMockEmitter(),
dataSegmentPusher,
new LocalDataSegmentKiller(),
new DataSegmentMover()
{
@ -783,60 +803,41 @@ public class TaskLifecycleTest
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
}
@Test
@Test (timeout = 4000L)
public void testRealtimeIndexTask() throws Exception
{
serverView.registerSegmentCallback(
EasyMock.anyObject(Executor.class),
EasyMock.anyObject(ServerView.SegmentCallback.class),
EasyMock.anyObject(Predicate.class)
);
EasyMock.expectLastCall().atLeastOnce();
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().atLeastOnce();
monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(monitorScheduler, serverView, queryRunnerFactoryConglomerate);
String taskId = "rt_task_1";
DataSchema dataSchema = new DataSchema(
"test_ds",
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null),
mapper
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new MockFirehoseFactory(true),
null, // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
null
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,
new Period("P1Y"),
null, //default window period of 10 minutes
null, // base persist dir ignored by Realtime Index task
null,
null,
null,
null,
null,
null,
null,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
RealtimeIndexTask realtimeIndexTask = new RealtimeIndexTask(
taskId,
new TaskResource(taskId, 1),
fireDepartment,
null
);
EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate);
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask();
final String taskId = realtimeIndexTask.getId();
tq.add(realtimeIndexTask);
//wait for task to process events and publish segment
Assert.assertTrue(publishCountDown.await(1000, TimeUnit.MILLISECONDS));
// Task will not finish until the segment is handed off and since we are not simulating hand off, the status should be running
Assert.assertTrue("Task should be running", tsqa.getStatus(taskId).get().isRunnable());
// Realtime Task has published the segment, simulate loading of segment to a historical node so that task finishes with SUCCESS status
segmentCallbacks.get(0).segmentAdded(
new DruidServerMetadata(
"dummy",
"dummy_host",
0,
"historical",
"dummy_tier",
0
), mdc.getPublished().iterator().next()
);
// Wait for realtime index task to handle callback in plumber and succeed
while (tsqa.getStatus(taskId).get().isRunnable()) {
Thread.sleep(10);
}
Assert.assertTrue("Task should be in Success state", tsqa.getStatus(taskId).get().isSuccess());
Assert.assertEquals(1, announcedSinks);
Assert.assertEquals(1, pushedSegments);
Assert.assertEquals(1, mdc.getPublished().size());
@ -848,7 +849,46 @@ public class TaskLifecycleTest
segment.getInterval()
);
Assert.assertEquals(ImmutableList.of("count"), segment.getMetrics());
EasyMock.verify(monitorScheduler, serverView, queryRunnerFactoryConglomerate);
EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate);
}
@Test (timeout = 4000L)
public void testRealtimeIndexTaskFailure() throws Exception
{
setUpAndStartTaskQueue(
new DataSegmentPusher()
{
@Override
public String getPathForHadoop(String s)
{
throw new UnsupportedOperationException();
}
@Override
public DataSegment push(File file, DataSegment dataSegment) throws IOException
{
throw new RuntimeException("FAILURE");
}
}
);
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().atLeastOnce();
monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate);
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask();
final String taskId = realtimeIndexTask.getId();
tq.add(realtimeIndexTask);
// Wait for realtime index task to fail
while (tsqa.getStatus(taskId).get().isRunnable()) {
Thread.sleep(10);
}
Assert.assertTrue("Task should be in Failure state", tsqa.getStatus(taskId).get().isFailure());
EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate);
}
@Test
@ -954,6 +994,44 @@ public class TaskLifecycleTest
return retVal;
}
private RealtimeIndexTask giveMeARealtimeIndexTask() {
String taskId = String.format("rt_task_%s", System.currentTimeMillis());
DataSchema dataSchema = new DataSchema(
"test_ds",
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null),
mapper
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new MockFirehoseFactory(true),
null, // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
null
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,
new Period("P1Y"),
null, //default window period of 10 minutes
null, // base persist dir ignored by Realtime Index task
null,
null,
null,
null,
null,
null,
null,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
return new RealtimeIndexTask(
taskId,
new TaskResource(taskId, 1),
fireDepartment,
null
);
}
private static class MockIndexerMetadataStorageCoordinator extends IndexerSQLMetadataStorageCoordinator
{
final private Set<DataSegment> published = Sets.newHashSet();