Merge pull request #2361 from pjain1/tasklife_test_refactor

clean up TaskLifecycleTest
This commit is contained in:
Nishant 2016-02-01 16:56:34 +05:30
commit 67b6556457
1 changed files with 222 additions and 135 deletions

View File

@ -135,19 +135,23 @@ public class TaskLifecycleTest
private static final IndexMerger INDEX_MERGER; private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9; private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO; private static final IndexIO INDEX_IO;
private static final TestUtils TEST_UTILS;
static { static {
TestUtils testUtils = new TestUtils(); TEST_UTILS = new TestUtils();
MAPPER = testUtils.getTestObjectMapper(); MAPPER = TEST_UTILS.getTestObjectMapper();
INDEX_MERGER = testUtils.getTestIndexMerger(); INDEX_MERGER = TEST_UTILS.getTestIndexMerger();
INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9(); INDEX_MERGER_V9 = TEST_UTILS.getTestIndexMergerV9();
INDEX_IO = testUtils.getTestIndexIO(); INDEX_IO = TEST_UTILS.getTestIndexIO();
} }
private static final String HEAP_TASK_STORAGE = "HeapMemoryTaskStorage";
private static final String METADATA_TASK_STORAGE = "MetadataTaskStorage";
@Parameterized.Parameters(name = "taskStorageType={0}") @Parameterized.Parameters(name = "taskStorageType={0}")
public static Collection<String[]> constructFeed() public static Collection<String[]> constructFeed()
{ {
return Arrays.asList(new String[][]{{"HeapMemoryTaskStorage"}, {"MetadataTaskStorage"}}); return Arrays.asList(new String[][]{{HEAP_TASK_STORAGE}, {METADATA_TASK_STORAGE}});
} }
public TaskLifecycleTest(String taskStorageType) public TaskLifecycleTest(String taskStorageType)
@ -168,11 +172,13 @@ public class TaskLifecycleTest
} }
}; };
private static DateTime now = new DateTime(); private static DateTime now = new DateTime();
private static final Iterable<InputRow> realtimeIdxTaskInputRows = ImmutableList.of( private static final Iterable<InputRow> realtimeIdxTaskInputRows = ImmutableList.of(
IR(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f), IR(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f),
IR(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f), IR(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f),
IR(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f) IR(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f)
); );
private static final Iterable<InputRow> IdxTaskInputRows = ImmutableList.of( private static final Iterable<InputRow> IdxTaskInputRows = ImmutableList.of(
IR("2010-01-01T01", "x", "y", 1), IR("2010-01-01T01", "x", "y", 1),
IR("2010-01-01T01", "x", "z", 1), IR("2010-01-01T01", "x", "z", 1),
@ -185,14 +191,12 @@ public class TaskLifecycleTest
private final String taskStorageType; private final String taskStorageType;
private ObjectMapper mapper; private ObjectMapper mapper;
private TaskStorageQueryAdapter tsqa = null; private TaskStorageQueryAdapter tsqa = null;
private File tmpDir = null; private TaskStorage taskStorage = null;
private TaskStorage ts = null; private TaskLockbox taskLockbox = null;
private TaskLockbox tl = null; private TaskQueue taskQueue = null;
private TaskQueue tq = null; private TaskRunner taskRunner = null;
private TaskRunner tr = null;
private TestIndexerMetadataStorageCoordinator mdc = null; private TestIndexerMetadataStorageCoordinator mdc = null;
private TaskActionClientFactory tac = null; private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null; private TaskToolboxFactory tb = null;
@ -201,27 +205,15 @@ public class TaskLifecycleTest
private MonitorScheduler monitorScheduler; private MonitorScheduler monitorScheduler;
private ServiceEmitter emitter; private ServiceEmitter emitter;
private TaskQueueConfig tqc; private TaskQueueConfig tqc;
private TaskConfig taskConfig;
private DataSegmentPusher dataSegmentPusher;
private int pushedSegments; private int pushedSegments;
private int announcedSinks; private int announcedSinks;
private static CountDownLatch publishCountDown;
private TestDerbyConnector testDerbyConnector;
private SegmentHandoffNotifierFactory handoffNotifierFactory; private SegmentHandoffNotifierFactory handoffNotifierFactory;
private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks; private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
private static CountDownLatch publishCountDown;
private static TestIndexerMetadataStorageCoordinator newMockMDC()
{
return new TestIndexerMetadataStorageCoordinator()
{
@Override
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
{
Set<DataSegment> retVal = super.announceHistoricalSegments(segments);
publishCountDown.countDown();
return retVal;
}
};
}
private static ServiceEmitter newMockEmitter() private static ServiceEmitter newMockEmitter()
{ {
@ -352,38 +344,64 @@ public class TaskLifecycleTest
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
emitter = EasyMock.createMock(ServiceEmitter.class); // mock things
EmittingLogger.registerEmitter(emitter);
queryRunnerFactoryConglomerate = EasyMock.createStrictMock(QueryRunnerFactoryConglomerate.class); queryRunnerFactoryConglomerate = EasyMock.createStrictMock(QueryRunnerFactoryConglomerate.class);
monitorScheduler = EasyMock.createStrictMock(MonitorScheduler.class); monitorScheduler = EasyMock.createStrictMock(MonitorScheduler.class);
publishCountDown = new CountDownLatch(1);
// initialize variables
announcedSinks = 0; announcedSinks = 0;
pushedSegments = 0; pushedSegments = 0;
tmpDir = temporaryFolder.newFolder();
TestUtils testUtils = new TestUtils();
mapper = testUtils.getTestObjectMapper();
tqc = mapper.readValue(
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}",
TaskQueueConfig.class
);
indexSpec = new IndexSpec(); indexSpec = new IndexSpec();
emitter = newMockEmitter();
EmittingLogger.registerEmitter(emitter);
mapper = TEST_UTILS.getTestObjectMapper();
handOffCallbacks = Maps.newConcurrentMap();
if (taskStorageType.equals("HeapMemoryTaskStorage")) { // Set up things, the order does matter as if it is messed up then the setUp
ts = new HeapMemoryTaskStorage( // should fail because of the Precondition checks in the respective setUp methods
// For creating a customized TaskQueue see testRealtimeIndexTaskFailure test
taskStorage = setUpTaskStorage();
handoffNotifierFactory = setUpSegmentHandOffNotifierFactory();
dataSegmentPusher = setUpDataSegmentPusher();
mdc = setUpMetadataStorageCoordinator();
tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc);
taskRunner = setUpThreadPoolTaskRunner(tb);
taskQueue = setUpTaskQueue(taskStorage, taskRunner);
}
private TaskStorage setUpTaskStorage()
{
Preconditions.checkNotNull(mapper);
Preconditions.checkNotNull(derbyConnectorRule);
TaskStorage taskStorage;
switch (taskStorageType) {
case HEAP_TASK_STORAGE: {
taskStorage = new HeapMemoryTaskStorage(
new TaskStorageConfig(null) new TaskStorageConfig(null)
{ {
} }
); );
} else if (taskStorageType.equals("MetadataTaskStorage")) { break;
testDerbyConnector = derbyConnectorRule.getConnector(); }
case METADATA_TASK_STORAGE: {
TestDerbyConnector testDerbyConnector = derbyConnectorRule.getConnector();
mapper.registerSubtypes( mapper.registerSubtypes(
new NamedType(MockExceptionalFirehoseFactory.class, "mockExcepFirehoseFactory"), new NamedType(MockExceptionalFirehoseFactory.class, "mockExcepFirehoseFactory"),
new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory") new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory")
); );
testDerbyConnector.createTaskTables(); testDerbyConnector.createTaskTables();
testDerbyConnector.createSegmentTable(); testDerbyConnector.createSegmentTable();
ts = new MetadataTaskStorage( taskStorage = new MetadataTaskStorage(
testDerbyConnector, testDerbyConnector,
new TaskStorageConfig(null), new TaskStorageConfig(null),
new SQLMetadataStorageActionHandlerFactory( new SQLMetadataStorageActionHandlerFactory(
@ -392,11 +410,22 @@ public class TaskLifecycleTest
mapper mapper
) )
); );
} else { break;
}
default: {
throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType)); throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType));
} }
handOffCallbacks = Maps.newConcurrentMap(); }
handoffNotifierFactory = new SegmentHandoffNotifierFactory() tsqa = new TaskStorageQueryAdapter(taskStorage);
return taskStorage;
}
private SegmentHandoffNotifierFactory setUpSegmentHandOffNotifierFactory()
{
Preconditions.checkNotNull(handOffCallbacks);
return new SegmentHandoffNotifierFactory()
{ {
@Override @Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
@ -433,8 +462,11 @@ public class TaskLifecycleTest
}; };
} }
}; };
setUpTaskQueue( }
new DataSegmentPusher()
private DataSegmentPusher setUpDataSegmentPusher()
{
return new DataSegmentPusher()
{ {
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
@ -448,21 +480,45 @@ public class TaskLifecycleTest
pushedSegments++; pushedSegments++;
return segment; return segment;
} }
} };
);
} }
private void setUpTaskQueue(DataSegmentPusher dataSegmentPusher) throws Exception private TestIndexerMetadataStorageCoordinator setUpMetadataStorageCoordinator()
{ {
final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); return new TestIndexerMetadataStorageCoordinator()
tsqa = new TaskStorageQueryAdapter(ts); {
tl = new TaskLockbox(ts); @Override
mdc = newMockMDC(); public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter())); {
tb = new TaskToolboxFactory( Set<DataSegment> retVal = super.announceHistoricalSegments(segments);
if (publishCountDown != null) {
publishCountDown.countDown();
}
return retVal;
}
};
}
private TaskToolboxFactory setUpTaskToolboxFactory(
DataSegmentPusher dataSegmentPusher,
SegmentHandoffNotifierFactory handoffNotifierFactory,
TestIndexerMetadataStorageCoordinator mdc
) throws IOException
{
Preconditions.checkNotNull(queryRunnerFactoryConglomerate);
Preconditions.checkNotNull(monitorScheduler);
Preconditions.checkNotNull(taskStorage);
Preconditions.checkNotNull(emitter);
taskLockbox = new TaskLockbox(taskStorage);
tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter));
File tmpDir = temporaryFolder.newFolder();
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
return new TaskToolboxFactory(
taskConfig, taskConfig,
tac, tac,
newMockEmitter(), emitter,
dataSegmentPusher, dataSegmentPusher,
new LocalDataSegmentKiller(), new LocalDataSegmentKiller(),
new DataSegmentMover() new DataSegmentMover()
@ -544,15 +600,35 @@ public class TaskLifecycleTest
FireDepartmentTest.NO_CACHE_CONFIG, FireDepartmentTest.NO_CACHE_CONFIG,
INDEX_MERGER_V9 INDEX_MERGER_V9
); );
tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter); }
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
private TaskRunner setUpThreadPoolTaskRunner(TaskToolboxFactory tb)
{
Preconditions.checkNotNull(taskConfig);
Preconditions.checkNotNull(emitter);
return new ThreadPoolTaskRunner(tb, taskConfig, emitter);
}
private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception
{
Preconditions.checkNotNull(taskLockbox);
Preconditions.checkNotNull(tac);
Preconditions.checkNotNull(emitter);
tqc = mapper.readValue(
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}",
TaskQueueConfig.class
);
return new TaskQueue(tqc, ts, tr, tac, taskLockbox, emitter);
} }
@After @After
public void tearDown() public void tearDown()
{ {
if (tq.isActive()) { if (taskQueue.isActive()) {
tq.stop(); taskQueue.stop();
} }
} }
@ -585,7 +661,7 @@ public class TaskLifecycleTest
Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent()); Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent());
final TaskStatus mergedStatus = runTask(indexTask); final TaskStatus mergedStatus = runTask(indexTask);
final TaskStatus status = ts.getStatus(indexTask.getId()).get(); final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get();
final List<DataSegment> publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished()); final List<DataSegment> publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished());
final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId())); final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
@ -878,17 +954,18 @@ public class TaskLifecycleTest
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testRealtimeIndexTask() throws Exception public void testRealtimeIndexTask() throws Exception
{ {
publishCountDown = new CountDownLatch(1);
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().atLeastOnce(); EasyMock.expectLastCall().atLeastOnce();
monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class)); monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();
EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate); EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate);
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); RealtimeIndexTask realtimeIndexTask = newRealtimeIndexTask();
final String taskId = realtimeIndexTask.getId(); final String taskId = realtimeIndexTask.getId();
tq.start(); taskQueue.start();
tq.add(realtimeIndexTask); taskQueue.add(realtimeIndexTask);
//wait for task to process events and publish segment //wait for task to process events and publish segment
publishCountDown.await(); publishCountDown.await();
@ -922,8 +999,7 @@ public class TaskLifecycleTest
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testRealtimeIndexTaskFailure() throws Exception public void testRealtimeIndexTaskFailure() throws Exception
{ {
setUpTaskQueue( dataSegmentPusher = new DataSegmentPusher()
new DataSegmentPusher()
{ {
@Override @Override
public String getPathForHadoop(String s) public String getPathForHadoop(String s)
@ -936,19 +1012,25 @@ public class TaskLifecycleTest
{ {
throw new RuntimeException("FAILURE"); throw new RuntimeException("FAILURE");
} }
} };
);
tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc);
taskRunner = setUpThreadPoolTaskRunner(tb);
taskQueue = setUpTaskQueue(taskStorage, taskRunner);
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().atLeastOnce(); EasyMock.expectLastCall().atLeastOnce();
monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class)); monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();
EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate); EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate);
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); RealtimeIndexTask realtimeIndexTask = newRealtimeIndexTask();
final String taskId = realtimeIndexTask.getId(); final String taskId = realtimeIndexTask.getId();
tq.start(); taskQueue.start();
tq.add(realtimeIndexTask); taskQueue.add(realtimeIndexTask);
// Wait for realtime index task to fail // Wait for realtime index task to fail
while (tsqa.getStatus(taskId).get().isRunnable()) { while (tsqa.getStatus(taskId).get().isRunnable()) {
@ -988,8 +1070,8 @@ public class TaskLifecycleTest
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
// manually insert the task into TaskStorage, waiting for TaskQueue to sync from storage // manually insert the task into TaskStorage, waiting for TaskQueue to sync from storage
tq.start(); taskQueue.start();
ts.insert(indexTask, TaskStatus.running(indexTask.getId())); taskStorage.insert(indexTask, TaskStatus.running(indexTask.getId()));
while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) { while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) {
if (System.currentTimeMillis() > startTime + 10 * 1000) { if (System.currentTimeMillis() > startTime + 10 * 1000) {
@ -999,7 +1081,7 @@ public class TaskLifecycleTest
Thread.sleep(100); Thread.sleep(100);
} }
final TaskStatus status = ts.getStatus(indexTask.getId()).get(); final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get();
final List<DataSegment> publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished()); final List<DataSegment> publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished());
final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId())); final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
@ -1037,9 +1119,14 @@ public class TaskLifecycleTest
Preconditions.checkArgument(!task.getId().equals(dummyTask.getId())); Preconditions.checkArgument(!task.getId().equals(dummyTask.getId()));
tq.start(); // Since multiple tasks can be run in a single unit test using runTask(), hence this check and synchronization
tq.add(dummyTask); synchronized (this) {
tq.add(task); if (!taskQueue.isActive()) {
taskQueue.start();
}
}
taskQueue.add(dummyTask);
taskQueue.add(task);
TaskStatus retVal = null; TaskStatus retVal = null;
@ -1065,7 +1152,7 @@ public class TaskLifecycleTest
return retVal; return retVal;
} }
private RealtimeIndexTask giveMeARealtimeIndexTask() private RealtimeIndexTask newRealtimeIndexTask()
{ {
String taskId = String.format("rt_task_%s", System.currentTimeMillis()); String taskId = String.format("rt_task_%s", System.currentTimeMillis());
DataSchema dataSchema = new DataSchema( DataSchema dataSchema = new DataSchema(