From 7853a9cc414b78d088405f455c12a402fe7165fa Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Sat, 30 Jan 2016 20:44:07 -0600 Subject: [PATCH] clean up TaskLifecycleTest --- .../indexing/overlord/TaskLifecycleTest.java | 357 +++++++++++------- 1 file changed, 222 insertions(+), 135 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index da2efc3391f..dbb3055bc74 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -135,19 +135,23 @@ public class TaskLifecycleTest private static final IndexMerger INDEX_MERGER; private static final IndexMergerV9 INDEX_MERGER_V9; private static final IndexIO INDEX_IO; + private static final TestUtils TEST_UTILS; static { - TestUtils testUtils = new TestUtils(); - MAPPER = testUtils.getTestObjectMapper(); - INDEX_MERGER = testUtils.getTestIndexMerger(); - INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9(); - INDEX_IO = testUtils.getTestIndexIO(); + TEST_UTILS = new TestUtils(); + MAPPER = TEST_UTILS.getTestObjectMapper(); + INDEX_MERGER = TEST_UTILS.getTestIndexMerger(); + INDEX_MERGER_V9 = TEST_UTILS.getTestIndexMergerV9(); + INDEX_IO = TEST_UTILS.getTestIndexIO(); } + private static final String HEAP_TASK_STORAGE = "HeapMemoryTaskStorage"; + private static final String METADATA_TASK_STORAGE = "MetadataTaskStorage"; + @Parameterized.Parameters(name = "taskStorageType={0}") public static Collection constructFeed() { - return Arrays.asList(new String[][]{{"HeapMemoryTaskStorage"}, {"MetadataTaskStorage"}}); + return Arrays.asList(new String[][]{{HEAP_TASK_STORAGE}, {METADATA_TASK_STORAGE}}); } public TaskLifecycleTest(String taskStorageType) @@ -168,11 +172,13 @@ public class TaskLifecycleTest } }; private static DateTime now = new DateTime(); + private static final Iterable realtimeIdxTaskInputRows = ImmutableList.of( IR(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f), IR(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f), IR(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f) ); + private static final Iterable IdxTaskInputRows = ImmutableList.of( IR("2010-01-01T01", "x", "y", 1), IR("2010-01-01T01", "x", "z", 1), @@ -185,14 +191,12 @@ public class TaskLifecycleTest private final String taskStorageType; - private ObjectMapper mapper; private TaskStorageQueryAdapter tsqa = null; - private File tmpDir = null; - private TaskStorage ts = null; - private TaskLockbox tl = null; - private TaskQueue tq = null; - private TaskRunner tr = null; + private TaskStorage taskStorage = null; + private TaskLockbox taskLockbox = null; + private TaskQueue taskQueue = null; + private TaskRunner taskRunner = null; private TestIndexerMetadataStorageCoordinator mdc = null; private TaskActionClientFactory tac = null; private TaskToolboxFactory tb = null; @@ -201,44 +205,32 @@ public class TaskLifecycleTest private MonitorScheduler monitorScheduler; private ServiceEmitter emitter; private TaskQueueConfig tqc; + private TaskConfig taskConfig; + private DataSegmentPusher dataSegmentPusher; + private int pushedSegments; private int announcedSinks; - private static CountDownLatch publishCountDown; - private TestDerbyConnector testDerbyConnector; private SegmentHandoffNotifierFactory handoffNotifierFactory; private Map> handOffCallbacks; - - private static TestIndexerMetadataStorageCoordinator newMockMDC() - { - return new TestIndexerMetadataStorageCoordinator() - { - @Override - public Set announceHistoricalSegments(Set segments) - { - Set retVal = super.announceHistoricalSegments(segments); - publishCountDown.countDown(); - return retVal; - } - }; - } + private static CountDownLatch publishCountDown; private static ServiceEmitter newMockEmitter() { - return new ServiceEmitter(null, null, null) - { - @Override - public void emit(Event event) + return new ServiceEmitter(null, null, null) { + @Override + public void emit(Event event) + { - } + } - @Override - public void emit(ServiceEventBuilder builder) - { + @Override + public void emit(ServiceEventBuilder builder) + { - } - }; + } + }; } private static InputRow IR(String dt, String dim1, String dim2, float met) @@ -352,51 +344,88 @@ public class TaskLifecycleTest @Before public void setUp() throws Exception { - emitter = EasyMock.createMock(ServiceEmitter.class); - EmittingLogger.registerEmitter(emitter); + // mock things queryRunnerFactoryConglomerate = EasyMock.createStrictMock(QueryRunnerFactoryConglomerate.class); monitorScheduler = EasyMock.createStrictMock(MonitorScheduler.class); - publishCountDown = new CountDownLatch(1); + + // initialize variables announcedSinks = 0; pushedSegments = 0; - tmpDir = temporaryFolder.newFolder(); - TestUtils testUtils = new TestUtils(); - mapper = testUtils.getTestObjectMapper(); - - tqc = mapper.readValue( - "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}", - TaskQueueConfig.class - ); indexSpec = new IndexSpec(); - - if (taskStorageType.equals("HeapMemoryTaskStorage")) { - ts = new HeapMemoryTaskStorage( - new TaskStorageConfig(null) - { - } - ); - } else if (taskStorageType.equals("MetadataTaskStorage")) { - testDerbyConnector = derbyConnectorRule.getConnector(); - mapper.registerSubtypes( - new NamedType(MockExceptionalFirehoseFactory.class, "mockExcepFirehoseFactory"), - new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory") - ); - testDerbyConnector.createTaskTables(); - testDerbyConnector.createSegmentTable(); - ts = new MetadataTaskStorage( - testDerbyConnector, - new TaskStorageConfig(null), - new SQLMetadataStorageActionHandlerFactory( - testDerbyConnector, - derbyConnectorRule.metadataTablesConfigSupplier().get(), - mapper - ) - ); - } else { - throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType)); - } + emitter = newMockEmitter(); + EmittingLogger.registerEmitter(emitter); + mapper = TEST_UTILS.getTestObjectMapper(); handOffCallbacks = Maps.newConcurrentMap(); - handoffNotifierFactory = new SegmentHandoffNotifierFactory() + + // Set up things, the order does matter as if it is messed up then the setUp + // should fail because of the Precondition checks in the respective setUp methods + // For creating a customized TaskQueue see testRealtimeIndexTaskFailure test + + taskStorage = setUpTaskStorage(); + + handoffNotifierFactory = setUpSegmentHandOffNotifierFactory(); + + dataSegmentPusher = setUpDataSegmentPusher(); + + mdc = setUpMetadataStorageCoordinator(); + + tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc); + + taskRunner = setUpThreadPoolTaskRunner(tb); + + taskQueue = setUpTaskQueue(taskStorage, taskRunner); + } + + private TaskStorage setUpTaskStorage() + { + Preconditions.checkNotNull(mapper); + Preconditions.checkNotNull(derbyConnectorRule); + + TaskStorage taskStorage; + + switch (taskStorageType) { + case HEAP_TASK_STORAGE: { + taskStorage = new HeapMemoryTaskStorage( + new TaskStorageConfig(null) + { + } + ); + break; + } + + case METADATA_TASK_STORAGE: { + TestDerbyConnector testDerbyConnector = derbyConnectorRule.getConnector(); + mapper.registerSubtypes( + new NamedType(MockExceptionalFirehoseFactory.class, "mockExcepFirehoseFactory"), + new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory") + ); + testDerbyConnector.createTaskTables(); + testDerbyConnector.createSegmentTable(); + taskStorage = new MetadataTaskStorage( + testDerbyConnector, + new TaskStorageConfig(null), + new SQLMetadataStorageActionHandlerFactory( + testDerbyConnector, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + mapper + ) + ); + break; + } + + default: { + throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType)); + } + } + tsqa = new TaskStorageQueryAdapter(taskStorage); + return taskStorage; + } + + private SegmentHandoffNotifierFactory setUpSegmentHandOffNotifierFactory() + { + Preconditions.checkNotNull(handOffCallbacks); + + return new SegmentHandoffNotifierFactory() { @Override public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) @@ -433,36 +462,63 @@ public class TaskLifecycleTest }; } }; - setUpTaskQueue( - new DataSegmentPusher() - { - @Override - public String getPathForHadoop(String dataSource) - { - throw new UnsupportedOperationException(); - } - - @Override - public DataSegment push(File file, DataSegment segment) throws IOException - { - pushedSegments++; - return segment; - } - } - ); } - private void setUpTaskQueue(DataSegmentPusher dataSegmentPusher) throws Exception + private DataSegmentPusher setUpDataSegmentPusher() { - final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); - tsqa = new TaskStorageQueryAdapter(ts); - tl = new TaskLockbox(ts); - mdc = newMockMDC(); - tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter())); - tb = new TaskToolboxFactory( + return new DataSegmentPusher() + { + @Override + public String getPathForHadoop(String dataSource) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment) throws IOException + { + pushedSegments++; + return segment; + } + }; + } + + private TestIndexerMetadataStorageCoordinator setUpMetadataStorageCoordinator() + { + return new TestIndexerMetadataStorageCoordinator() + { + @Override + public Set announceHistoricalSegments(Set segments) + { + Set retVal = super.announceHistoricalSegments(segments); + if (publishCountDown != null) { + publishCountDown.countDown(); + } + return retVal; + } + }; + } + + private TaskToolboxFactory setUpTaskToolboxFactory( + DataSegmentPusher dataSegmentPusher, + SegmentHandoffNotifierFactory handoffNotifierFactory, + TestIndexerMetadataStorageCoordinator mdc + ) throws IOException + { + Preconditions.checkNotNull(queryRunnerFactoryConglomerate); + Preconditions.checkNotNull(monitorScheduler); + Preconditions.checkNotNull(taskStorage); + Preconditions.checkNotNull(emitter); + + taskLockbox = new TaskLockbox(taskStorage); + tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter)); + File tmpDir = temporaryFolder.newFolder(); + taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); + + return new TaskToolboxFactory( taskConfig, tac, - newMockEmitter(), + emitter, dataSegmentPusher, new LocalDataSegmentKiller(), new DataSegmentMover() @@ -544,15 +600,35 @@ public class TaskLifecycleTest FireDepartmentTest.NO_CACHE_CONFIG, INDEX_MERGER_V9 ); - tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter); - tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); + } + + private TaskRunner setUpThreadPoolTaskRunner(TaskToolboxFactory tb) + { + Preconditions.checkNotNull(taskConfig); + Preconditions.checkNotNull(emitter); + + return new ThreadPoolTaskRunner(tb, taskConfig, emitter); + } + + private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception + { + Preconditions.checkNotNull(taskLockbox); + Preconditions.checkNotNull(tac); + Preconditions.checkNotNull(emitter); + + tqc = mapper.readValue( + "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}", + TaskQueueConfig.class + ); + + return new TaskQueue(tqc, ts, tr, tac, taskLockbox, emitter); } @After public void tearDown() { - if (tq.isActive()) { - tq.stop(); + if (taskQueue.isActive()) { + taskQueue.stop(); } } @@ -585,7 +661,7 @@ public class TaskLifecycleTest Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent()); final TaskStatus mergedStatus = runTask(indexTask); - final TaskStatus status = ts.getStatus(indexTask.getId()).get(); + final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get(); final List publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished()); final List loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId())); @@ -878,17 +954,18 @@ public class TaskLifecycleTest @Test(timeout = 60_000L) public void testRealtimeIndexTask() throws Exception { + publishCountDown = new CountDownLatch(1); monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); EasyMock.expectLastCall().atLeastOnce(); monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class)); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate); - RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); + RealtimeIndexTask realtimeIndexTask = newRealtimeIndexTask(); final String taskId = realtimeIndexTask.getId(); - tq.start(); - tq.add(realtimeIndexTask); + taskQueue.start(); + taskQueue.add(realtimeIndexTask); //wait for task to process events and publish segment publishCountDown.await(); @@ -922,33 +999,38 @@ public class TaskLifecycleTest @Test(timeout = 60_000L) public void testRealtimeIndexTaskFailure() throws Exception { - setUpTaskQueue( - new DataSegmentPusher() - { - @Override - public String getPathForHadoop(String s) - { - throw new UnsupportedOperationException(); - } + dataSegmentPusher = new DataSegmentPusher() + { + @Override + public String getPathForHadoop(String s) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment dataSegment) throws IOException + { + throw new RuntimeException("FAILURE"); + } + }; + + tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc); + + taskRunner = setUpThreadPoolTaskRunner(tb); + + taskQueue = setUpTaskQueue(taskStorage, taskRunner); - @Override - public DataSegment push(File file, DataSegment dataSegment) throws IOException - { - throw new RuntimeException("FAILURE"); - } - } - ); monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); EasyMock.expectLastCall().atLeastOnce(); monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class)); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate); - RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); + RealtimeIndexTask realtimeIndexTask = newRealtimeIndexTask(); final String taskId = realtimeIndexTask.getId(); - tq.start(); - tq.add(realtimeIndexTask); + taskQueue.start(); + taskQueue.add(realtimeIndexTask); // Wait for realtime index task to fail while (tsqa.getStatus(taskId).get().isRunnable()) { @@ -988,8 +1070,8 @@ public class TaskLifecycleTest final long startTime = System.currentTimeMillis(); // manually insert the task into TaskStorage, waiting for TaskQueue to sync from storage - tq.start(); - ts.insert(indexTask, TaskStatus.running(indexTask.getId())); + taskQueue.start(); + taskStorage.insert(indexTask, TaskStatus.running(indexTask.getId())); while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) { if (System.currentTimeMillis() > startTime + 10 * 1000) { @@ -999,7 +1081,7 @@ public class TaskLifecycleTest Thread.sleep(100); } - final TaskStatus status = ts.getStatus(indexTask.getId()).get(); + final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get(); final List publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished()); final List loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId())); @@ -1037,9 +1119,14 @@ public class TaskLifecycleTest Preconditions.checkArgument(!task.getId().equals(dummyTask.getId())); - tq.start(); - tq.add(dummyTask); - tq.add(task); + // Since multiple tasks can be run in a single unit test using runTask(), hence this check and synchronization + synchronized (this) { + if (!taskQueue.isActive()) { + taskQueue.start(); + } + } + taskQueue.add(dummyTask); + taskQueue.add(task); TaskStatus retVal = null; @@ -1065,7 +1152,7 @@ public class TaskLifecycleTest return retVal; } - private RealtimeIndexTask giveMeARealtimeIndexTask() + private RealtimeIndexTask newRealtimeIndexTask() { String taskId = String.format("rt_task_%s", System.currentTimeMillis()); DataSchema dataSchema = new DataSchema(