From b630720164882ab0f875ac0871ca2f8c77b8548c Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Fri, 25 Sep 2015 09:01:53 -0500 Subject: [PATCH] fail task if finishjob throws any exception add realtime task failure test --- .../common/task/RealtimeIndexTask.java | 3 +- .../indexing/overlord/TaskLifecycleTest.java | 194 ++++++++++++------ 2 files changed, 138 insertions(+), 59 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 123820c0f6a..b8a2c7e4190 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -55,12 +55,12 @@ import io.druid.segment.realtime.plumber.RealtimePlumberSchool; import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; -import java.util.Map; import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.Random; public class RealtimeIndexTask extends AbstractTask @@ -337,6 +337,7 @@ public class RealtimeIndexTask extends AbstractTask } catch (Exception e) { log.makeAlert(e, "Failed to finish realtime task").emit(); + throw e; } finally { // firehose will be non-null since normalExit is true diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 76a2f052ab3..606c8cc2f36 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -94,6 +94,7 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -112,6 +113,7 @@ import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -181,10 +183,13 @@ public class TaskLifecycleTest private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private FilteredServerView serverView; private MonitorScheduler monitorScheduler; + private ServiceEmitter emitter; + private TaskQueueConfig tqc; private int pushedSegments; private int announcedSinks; private static CountDownLatch publishCountDown; private TestDerbyConnector testDerbyConnector; + private List segmentCallbacks = new ArrayList<>(); private static MockIndexerMetadataStorageCoordinator newMockMDC() { @@ -320,17 +325,16 @@ public class TaskLifecycleTest @Before public void setUp() throws Exception { - final ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + emitter = EasyMock.createMock(ServiceEmitter.class); EmittingLogger.registerEmitter(emitter); queryRunnerFactoryConglomerate = EasyMock.createStrictMock(QueryRunnerFactoryConglomerate.class); - serverView = EasyMock.createStrictMock(FilteredServerView.class); monitorScheduler = EasyMock.createStrictMock(MonitorScheduler.class); publishCountDown = new CountDownLatch(1); announcedSinks = 0; pushedSegments = 0; tmpDir = temporaryFolder.newFolder(); - final TaskQueueConfig tqc = new DefaultObjectMapper().readValue( + tqc = new DefaultObjectMapper().readValue( "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}", TaskQueueConfig.class ); @@ -361,14 +365,17 @@ public class TaskLifecycleTest throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType)); } - tsqa = new TaskStorageQueryAdapter(ts); - tl = new TaskLockbox(ts); - mdc = newMockMDC(); - tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter())); - tb = new TaskToolboxFactory( - new TaskConfig(tmpDir.toString(), null, null, 50000, null), - tac, - newMockEmitter(), + serverView = new FilteredServerView() + { + @Override + public void registerSegmentCallback( + Executor exec, ServerView.SegmentCallback callback, Predicate filter + ) + { + segmentCallbacks.add(callback); + } + }; + setUpAndStartTaskQueue( new DataSegmentPusher() { @Override @@ -383,7 +390,20 @@ public class TaskLifecycleTest pushedSegments++; return segment; } - }, + } + ); + } + + private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) { + tsqa = new TaskStorageQueryAdapter(ts); + tl = new TaskLockbox(ts); + mdc = newMockMDC(); + tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter())); + tb = new TaskToolboxFactory( + new TaskConfig(tmpDir.toString(), null, null, 50000, null), + tac, + newMockEmitter(), + dataSegmentPusher, new LocalDataSegmentKiller(), new DataSegmentMover() { @@ -783,60 +803,41 @@ public class TaskLifecycleTest Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); } - @Test + @Test (timeout = 4000L) public void testRealtimeIndexTask() throws Exception { - serverView.registerSegmentCallback( - EasyMock.anyObject(Executor.class), - EasyMock.anyObject(ServerView.SegmentCallback.class), - EasyMock.anyObject(Predicate.class) - ); - EasyMock.expectLastCall().atLeastOnce(); monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); EasyMock.expectLastCall().atLeastOnce(); monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class)); EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(monitorScheduler, serverView, queryRunnerFactoryConglomerate); - String taskId = "rt_task_1"; - DataSchema dataSchema = new DataSchema( - "test_ds", - null, - new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")}, - new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null), - mapper - ); - RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( - new MockFirehoseFactory(true), - null, // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class - null - ); - RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( - 1000, - new Period("P1Y"), - null, //default window period of 10 minutes - null, // base persist dir ignored by Realtime Index task - null, - null, - null, - null, - null, - null, - null, - null, - null - ); - FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); - RealtimeIndexTask realtimeIndexTask = new RealtimeIndexTask( - taskId, - new TaskResource(taskId, 1), - fireDepartment, - null - ); + EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate); + + RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); + final String taskId = realtimeIndexTask.getId(); + tq.add(realtimeIndexTask); //wait for task to process events and publish segment Assert.assertTrue(publishCountDown.await(1000, TimeUnit.MILLISECONDS)); - // Task will not finish until the segment is handed off and since we are not simulating hand off, the status should be running - Assert.assertTrue("Task should be running", tsqa.getStatus(taskId).get().isRunnable()); + + // Realtime Task has published the segment, simulate loading of segment to a historical node so that task finishes with SUCCESS status + segmentCallbacks.get(0).segmentAdded( + new DruidServerMetadata( + "dummy", + "dummy_host", + 0, + "historical", + "dummy_tier", + 0 + ), mdc.getPublished().iterator().next() + ); + + // Wait for realtime index task to handle callback in plumber and succeed + while (tsqa.getStatus(taskId).get().isRunnable()) { + Thread.sleep(10); + } + + Assert.assertTrue("Task should be in Success state", tsqa.getStatus(taskId).get().isSuccess()); + Assert.assertEquals(1, announcedSinks); Assert.assertEquals(1, pushedSegments); Assert.assertEquals(1, mdc.getPublished().size()); @@ -848,7 +849,46 @@ public class TaskLifecycleTest segment.getInterval() ); Assert.assertEquals(ImmutableList.of("count"), segment.getMetrics()); - EasyMock.verify(monitorScheduler, serverView, queryRunnerFactoryConglomerate); + EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate); + } + + @Test (timeout = 4000L) + public void testRealtimeIndexTaskFailure() throws Exception + { + setUpAndStartTaskQueue( + new DataSegmentPusher() + { + @Override + public String getPathForHadoop(String s) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment dataSegment) throws IOException + { + throw new RuntimeException("FAILURE"); + } + } + ); + monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); + EasyMock.expectLastCall().atLeastOnce(); + monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class)); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate); + + RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); + final String taskId = realtimeIndexTask.getId(); + tq.add(realtimeIndexTask); + + // Wait for realtime index task to fail + while (tsqa.getStatus(taskId).get().isRunnable()) { + Thread.sleep(10); + } + + Assert.assertTrue("Task should be in Failure state", tsqa.getStatus(taskId).get().isFailure()); + + EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate); } @Test @@ -954,6 +994,44 @@ public class TaskLifecycleTest return retVal; } + private RealtimeIndexTask giveMeARealtimeIndexTask() { + String taskId = String.format("rt_task_%s", System.currentTimeMillis()); + DataSchema dataSchema = new DataSchema( + "test_ds", + null, + new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")}, + new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null), + mapper + ); + RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( + new MockFirehoseFactory(true), + null, // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class + null + ); + RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( + 1000, + new Period("P1Y"), + null, //default window period of 10 minutes + null, // base persist dir ignored by Realtime Index task + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); + return new RealtimeIndexTask( + taskId, + new TaskResource(taskId, 1), + fireDepartment, + null + ); + } + private static class MockIndexerMetadataStorageCoordinator extends IndexerSQLMetadataStorageCoordinator { final private Set published = Sets.newHashSet();