Realtime Index Task test

This commit is contained in:
Parag Jain 2015-06-18 16:28:14 -05:00
parent 6763e3780a
commit 284b80b09e
1 changed files with 127 additions and 7 deletions

View File

@ -19,6 +19,7 @@ package io.druid.indexing.overlord;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
@ -35,11 +36,16 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Event; import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.emitter.service.ServiceEventBuilder;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
@ -56,29 +62,38 @@ import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.AbstractFixedIntervalTask;
import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.KillTask;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Hours;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -90,6 +105,9 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
public class TaskLifecycleTest public class TaskLifecycleTest
{ {
@ -111,6 +129,12 @@ public class TaskLifecycleTest
private TaskActionClientFactory tac = null; private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null; private TaskToolboxFactory tb = null;
private IndexSpec indexSpec; private IndexSpec indexSpec;
private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private FilteredServerView serverView;
private MonitorScheduler monitorScheduler;
private int pushedSegments;
private int announcedSinks;
private static CountDownLatch publishCountDown;
private static MockIndexerMetadataStorageCoordinator newMockMDC() private static MockIndexerMetadataStorageCoordinator newMockMDC()
{ {
@ -243,7 +267,12 @@ public class TaskLifecycleTest
{ {
final ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); final ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter); EmittingLogger.registerEmitter(emitter);
queryRunnerFactoryConglomerate = EasyMock.createStrictMock(QueryRunnerFactoryConglomerate.class);
serverView = EasyMock.createStrictMock(FilteredServerView.class);
monitorScheduler = EasyMock.createStrictMock(MonitorScheduler.class);
publishCountDown = new CountDownLatch(1);
announcedSinks = 0;
pushedSegments = 0;
tmp = Files.createTempDir(); tmp = Files.createTempDir();
final TaskQueueConfig tqc = new DefaultObjectMapper().readValue( final TaskQueueConfig tqc = new DefaultObjectMapper().readValue(
@ -275,6 +304,7 @@ public class TaskLifecycleTest
@Override @Override
public DataSegment push(File file, DataSegment segment) throws IOException public DataSegment push(File file, DataSegment segment) throws IOException
{ {
pushedSegments++;
return segment; return segment;
} }
}, },
@ -309,11 +339,36 @@ public class TaskLifecycleTest
return segment; return segment;
} }
}, },
null, // segment announcer new DataSegmentAnnouncer()
null, // new segment server view {
null, // query runner factory conglomerate corporation unionized collective @Override
public void announceSegment(DataSegment segment) throws IOException
{
announcedSinks++;
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
}
}, // segment announcer
serverView, // new segment server view
queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
null, // query executor service null, // query executor service
null, // monitor scheduler monitorScheduler, // monitor scheduler
new SegmentLoaderFactory( new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager( new SegmentLoaderLocalCacheManager(
null, null,
@ -593,6 +648,71 @@ public class TaskLifecycleTest
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
} }
@Test
public void testRealtimeIndexTask() throws Exception{
serverView.registerSegmentCallback(EasyMock.anyObject(Executor.class),
EasyMock.anyObject(ServerView.SegmentCallback.class),
EasyMock.anyObject(Predicate.class));
EasyMock.expectLastCall().atLeastOnce();
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().atLeastOnce();
monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(monitorScheduler, serverView, queryRunnerFactoryConglomerate);
String taskId = "rt_task_1";
DataSchema dataSchema = new DataSchema(
"test_ds",
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null)
);
DateTime now = new DateTime();
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
newMockFirehoseFactory(
ImmutableList.of(
IR(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f),
IR(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f),
IR(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f)
)
),
null // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,
new Period("P1Y"),
null, //default window period of 10 minutes
null, // base persist dir ignored by Realtime Index task
null,
null,
null,
null,
null,
null,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
RealtimeIndexTask realtimeIndexTask = new RealtimeIndexTask(
taskId,
new TaskResource(taskId, 1),
fireDepartment
);
tq.add(realtimeIndexTask);
//wait for task to process events and publish segment
Assert.assertTrue(publishCountDown.await(1000, TimeUnit.MILLISECONDS));
// Task will not finish until the segment is handed off and since we are not simulating hand off, the status should be running
Assert.assertTrue("Task should be running", tsqa.getStatus(taskId).get().isRunnable());
Assert.assertEquals(1, announcedSinks);
Assert.assertEquals(1, pushedSegments);
Assert.assertEquals(1, mdc.getPublished().size());
DataSegment segment = mdc.getPublished().iterator().next();
Assert.assertEquals("test_ds", segment.getDataSource());
Assert.assertEquals(ImmutableList.of("dim1", "dim2"), segment.getDimensions());
Assert.assertEquals(new Interval(now.toString("YYYY-MM-dd")+"/"+now.plusDays(1).toString("YYYY-MM-dd")), segment.getInterval());
Assert.assertEquals(ImmutableList.of("count"), segment.getMetrics());
EasyMock.verify(monitorScheduler, serverView, queryRunnerFactoryConglomerate);
}
private TaskStatus runTask(final Task task) throws Exception private TaskStatus runTask(final Task task) throws Exception
{ {
final Task dummyTask = new DefaultObjectMapper().readValue( final Task dummyTask = new DefaultObjectMapper().readValue(
@ -661,7 +781,7 @@ public class TaskLifecycleTest
added.add(segment); added.add(segment);
} }
} }
TaskLifecycleTest.publishCountDown.countDown();
return ImmutableSet.copyOf(added); return ImmutableSet.copyOf(added);
} }