From 284b80b09e4dc9db2e1c082a05ac37e6c0980c38 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Thu, 18 Jun 2015 16:28:14 -0500 Subject: [PATCH] Realtime Index Task test --- .../indexing/overlord/TaskLifecycleTest.java | 134 +++++++++++++++++- 1 file changed, 127 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index bb2971b98fc..803179a3e11 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -19,6 +19,7 @@ package io.druid.indexing.overlord; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -35,11 +36,16 @@ import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Event; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; +import com.metamx.metrics.Monitor; +import com.metamx.metrics.MonitorScheduler; +import io.druid.client.FilteredServerView; +import io.druid.client.ServerView; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; +import io.druid.granularity.QueryGranularity; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; @@ -56,29 +62,38 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.KillTask; +import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.jackson.DefaultObjectMapper; import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.FireDepartment; +import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.Hours; import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -90,6 +105,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; public class TaskLifecycleTest { @@ -111,6 +129,12 @@ public class TaskLifecycleTest private TaskActionClientFactory tac = null; private TaskToolboxFactory tb = null; private IndexSpec indexSpec; + private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; + private FilteredServerView serverView; + private MonitorScheduler monitorScheduler; + private int pushedSegments; + private int announcedSinks; + private static CountDownLatch publishCountDown; private static MockIndexerMetadataStorageCoordinator newMockMDC() { @@ -243,7 +267,12 @@ public class TaskLifecycleTest { final ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); EmittingLogger.registerEmitter(emitter); - + queryRunnerFactoryConglomerate = EasyMock.createStrictMock(QueryRunnerFactoryConglomerate.class); + serverView = EasyMock.createStrictMock(FilteredServerView.class); + monitorScheduler = EasyMock.createStrictMock(MonitorScheduler.class); + publishCountDown = new CountDownLatch(1); + announcedSinks = 0; + pushedSegments = 0; tmp = Files.createTempDir(); final TaskQueueConfig tqc = new DefaultObjectMapper().readValue( @@ -275,6 +304,7 @@ public class TaskLifecycleTest @Override public DataSegment push(File file, DataSegment segment) throws IOException { + pushedSegments++; return segment; } }, @@ -309,11 +339,36 @@ public class TaskLifecycleTest return segment; } }, - null, // segment announcer - null, // new segment server view - null, // query runner factory conglomerate corporation unionized collective + new DataSegmentAnnouncer() + { + @Override + public void announceSegment(DataSegment segment) throws IOException + { + announcedSinks++; + } + + @Override + public void unannounceSegment(DataSegment segment) throws IOException + { + + } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + + } + }, // segment announcer + serverView, // new segment server view + queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective null, // query executor service - null, // monitor scheduler + monitorScheduler, // monitor scheduler new SegmentLoaderFactory( new SegmentLoaderLocalCacheManager( null, @@ -593,6 +648,71 @@ public class TaskLifecycleTest Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); } + @Test + public void testRealtimeIndexTask() throws Exception{ + serverView.registerSegmentCallback(EasyMock.anyObject(Executor.class), + EasyMock.anyObject(ServerView.SegmentCallback.class), + EasyMock.anyObject(Predicate.class)); + EasyMock.expectLastCall().atLeastOnce(); + monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); + EasyMock.expectLastCall().atLeastOnce(); + monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class)); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(monitorScheduler, serverView, queryRunnerFactoryConglomerate); + String taskId = "rt_task_1"; + DataSchema dataSchema = new DataSchema( + "test_ds", + null, + new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")}, + new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null) + ); + DateTime now = new DateTime(); + RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( + newMockFirehoseFactory( + ImmutableList.of( + IR(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f), + IR(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f), + IR(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f) + ) + ), + null // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class + ); + RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( + 1000, + new Period("P1Y"), + null, //default window period of 10 minutes + null, // base persist dir ignored by Realtime Index task + null, + null, + null, + null, + null, + null, + null, + null + ); + FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); + RealtimeIndexTask realtimeIndexTask = new RealtimeIndexTask( + taskId, + new TaskResource(taskId, 1), + fireDepartment + ); + tq.add(realtimeIndexTask); + //wait for task to process events and publish segment + Assert.assertTrue(publishCountDown.await(1000, TimeUnit.MILLISECONDS)); + // Task will not finish until the segment is handed off and since we are not simulating hand off, the status should be running + Assert.assertTrue("Task should be running", tsqa.getStatus(taskId).get().isRunnable()); + Assert.assertEquals(1, announcedSinks); + Assert.assertEquals(1, pushedSegments); + Assert.assertEquals(1, mdc.getPublished().size()); + DataSegment segment = mdc.getPublished().iterator().next(); + Assert.assertEquals("test_ds", segment.getDataSource()); + Assert.assertEquals(ImmutableList.of("dim1", "dim2"), segment.getDimensions()); + Assert.assertEquals(new Interval(now.toString("YYYY-MM-dd")+"/"+now.plusDays(1).toString("YYYY-MM-dd")), segment.getInterval()); + Assert.assertEquals(ImmutableList.of("count"), segment.getMetrics()); + EasyMock.verify(monitorScheduler, serverView, queryRunnerFactoryConglomerate); + } + private TaskStatus runTask(final Task task) throws Exception { final Task dummyTask = new DefaultObjectMapper().readValue( @@ -661,7 +781,7 @@ public class TaskLifecycleTest added.add(segment); } } - + TaskLifecycleTest.publishCountDown.countDown(); return ImmutableSet.copyOf(added); }