Unit tests for KillTask and MetadataTaskStorage

This commit is contained in:
Bingkun Guo 2015-06-18 13:46:30 -05:00
parent b5b9ca1446
commit 282a0f9760
2 changed files with 346 additions and 129 deletions

View File

@ -67,6 +67,13 @@
<artifactId>hadoop-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,9 +17,16 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -28,7 +35,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
@ -68,6 +74,10 @@ import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.metadata.TestDerbyConnector;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -78,9 +88,9 @@ import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentKiller;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException;
@ -88,7 +98,7 @@ import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Hours;
@ -97,10 +107,16 @@ import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -109,8 +125,25 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class)
public class TaskLifecycleTest
{
@Parameterized.Parameters(name = "taskStorageType={0}")
public static Collection<String[]> constructFeed()
{
return Arrays.asList(new String[][]{{"HeapMemoryTaskStorage"}, {"MetadataTaskStorage"}});
}
public TaskLifecycleTest(String taskStorageType)
{
this.taskStorageType = taskStorageType;
}
public final
@Rule
TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final Ordering<DataSegment> byIntervalOrdering = new Ordering<DataSegment>()
{
@Override
@ -119,8 +152,24 @@ public class TaskLifecycleTest
return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval());
}
};
TaskStorageQueryAdapter tsqa = null;
private File tmp = null;
private static DateTime now = new DateTime();
private static final Iterable<InputRow> realtimeIdxTaskInputRows = ImmutableList.of(
IR(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f),
IR(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f),
IR(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f)
);
private static final Iterable<InputRow> IdxTaskInputRows = ImmutableList.of(
IR("2010-01-01T01", "x", "y", 1),
IR("2010-01-01T01", "x", "z", 1),
IR("2010-01-02T01", "a", "b", 2),
IR("2010-01-02T01", "a", "c", 1)
);
private final String taskStorageType;
private ObjectMapper mapper;
private TaskStorageQueryAdapter tsqa = null;
private File tmpDir = null;
private TaskStorage ts = null;
private TaskLockbox tl = null;
private TaskQueue tq = null;
@ -135,6 +184,7 @@ public class TaskLifecycleTest
private int pushedSegments;
private int announcedSinks;
private static CountDownLatch publishCountDown;
private TestDerbyConnector testDerbyConnector;
private static MockIndexerMetadataStorageCoordinator newMockMDC()
{
@ -172,94 +222,99 @@ public class TaskLifecycleTest
);
}
private static FirehoseFactory newMockExceptionalFirehoseFactory()
private static class MockExceptionalFirehoseFactory implements FirehoseFactory
{
return new FirehoseFactory()
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
return new Firehose()
{
return new Firehose()
@Override
public boolean hasMore()
{
@Override
public boolean hasMore()
{
return true;
}
return true;
}
@Override
public InputRow nextRow()
{
throw new RuntimeException("HA HA HA");
}
@Override
public InputRow nextRow()
{
throw new RuntimeException("HA HA HA");
}
@Override
public Runnable commit()
@Override
public Runnable commit()
{
return new Runnable()
{
return new Runnable()
@Override
public void run()
{
@Override
public void run()
{
}
};
}
}
};
}
@Override
public void close() throws IOException
{
@Override
public void close() throws IOException
{
}
};
}
};
}
};
}
}
private static FirehoseFactory newMockFirehoseFactory(final Iterable<InputRow> inputRows)
private static class MockFirehoseFactory implements FirehoseFactory
{
return new FirehoseFactory()
@JsonProperty
private boolean usedByRealtimeIdxTask;
@JsonCreator
public MockFirehoseFactory(@JsonProperty("usedByRealtimeIdxTask") boolean usedByRealtimeIdxTask)
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
this.usedByRealtimeIdxTask = usedByRealtimeIdxTask;
}
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
final Iterator<InputRow> inputRowIterator = usedByRealtimeIdxTask
? realtimeIdxTaskInputRows.iterator()
: IdxTaskInputRows.iterator();
return new Firehose()
{
final Iterator<InputRow> inputRowIterator = inputRows.iterator();
return new Firehose()
@Override
public boolean hasMore()
{
@Override
public boolean hasMore()
{
return inputRowIterator.hasNext();
}
return inputRowIterator.hasNext();
}
@Override
public InputRow nextRow()
{
return inputRowIterator.next();
}
@Override
public InputRow nextRow()
{
return inputRowIterator.next();
}
@Override
public Runnable commit()
@Override
public Runnable commit()
{
return new Runnable()
{
return new Runnable()
@Override
public void run()
{
@Override
public void run()
{
}
};
}
}
};
}
@Override
public void close() throws IOException
{
@Override
public void close() throws IOException
{
}
};
}
};
}
};
}
}
@Before
@ -273,24 +328,53 @@ public class TaskLifecycleTest
publishCountDown = new CountDownLatch(1);
announcedSinks = 0;
pushedSegments = 0;
tmp = Files.createTempDir();
tmpDir = temporaryFolder.newFolder();
final TaskQueueConfig tqc = new DefaultObjectMapper().readValue(
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}",
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}",
TaskQueueConfig.class
);
indexSpec = new IndexSpec();
ts = new HeapMemoryTaskStorage(
new TaskStorageConfig(null)
{
}
);
if (taskStorageType.equals("HeapMemoryTaskStorage")) {
ts = new HeapMemoryTaskStorage(
new TaskStorageConfig(null)
{
}
);
} else if (taskStorageType.equals("MetadataTaskStorage")) {
MetadataStorageTablesConfig tablesConfig = MetadataStorageTablesConfig.fromBase("test");
testDerbyConnector = new TestDerbyConnector(
Suppliers.ofInstance(
new MetadataStorageConnectorConfig()
),
Suppliers.ofInstance(
tablesConfig
)
);
mapper = new DefaultObjectMapper();
mapper.registerSubtypes(
new NamedType(MockExceptionalFirehoseFactory.class, "mockExcepFirehoseFactory"),
new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory")
);
mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, mapper));
testDerbyConnector.createTaskTables();
testDerbyConnector.createSegmentTable();
ts = new MetadataTaskStorage(
testDerbyConnector,
new TaskStorageConfig(null),
new SQLMetadataStorageActionHandlerFactory(testDerbyConnector, tablesConfig, mapper)
);
} else {
throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType));
}
tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts);
mdc = newMockMDC();
tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter()));
tb = new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 50000, null),
new TaskConfig(tmpDir.toString(), null, null, 50000, null),
tac,
newMockEmitter(),
new DataSegmentPusher()
@ -308,14 +392,7 @@ public class TaskLifecycleTest
return segment;
}
},
new DataSegmentKiller()
{
@Override
public void kill(DataSegment segments) throws SegmentLoadingException
{
}
},
new LocalDataSegmentKiller(),
new DataSegmentMover()
{
@Override
@ -392,13 +469,10 @@ public class TaskLifecycleTest
@After
public void tearDown()
{
try {
FileUtils.deleteDirectory(tmp);
}
catch (Exception e) {
// suppress
}
tq.stop();
if (testDerbyConnector != null) {
testDerbyConnector.tearDown();
}
}
@Test
@ -406,20 +480,20 @@ public class TaskLifecycleTest
{
final Task indexTask = new IndexTask(
null,
new IndexTask.IndexIngestionSpec(new DataSchema("foo", null,new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
) ),
new IndexTask.IndexIOConfig(newMockFirehoseFactory(
ImmutableList.of(
IR("2010-01-01T01", "x", "y", 1),
IR("2010-01-01T01", "x", "z", 1),
IR("2010-01-02T01", "a", "b", 2),
IR("2010-01-02T01", "a", "c", 1)
)
)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)),
new IndexTask.IndexIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
),
TestUtils.MAPPER
);
@ -472,7 +546,7 @@ public class TaskLifecycleTest
ImmutableList.of(new Interval("2010-01-01/P1D"))
)
),
new IndexTask.IndexIOConfig(newMockExceptionalFirehoseFactory()),
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
),
TestUtils.MAPPER
@ -488,14 +562,79 @@ public class TaskLifecycleTest
@Test
public void testKillTask() throws Exception
{
// This test doesn't actually do anything right now. We should actually put things into the Mocked coordinator
// Such that this test can test things...
final Task killTask = new KillTask(null, "foo", new Interval("2010-01-02/P2D"));
final File tmpSegmentDir = temporaryFolder.newFolder();
List<DataSegment> expectedUnusedSegments = Lists.transform(
ImmutableList.<String>of(
"2011-04-01/2011-04-02",
"2011-04-02/2011-04-03",
"2011-04-04/2011-04-05"
), new Function<String, DataSegment>()
{
@Override
public DataSegment apply(String input)
{
final Interval interval = new Interval(input);
try {
return DataSegment.builder()
.dataSource("test_kill_task")
.interval(interval)
.loadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
tmpSegmentDir.getCanonicalPath()
+ "/druid/localStorage/wikipedia/"
+ interval.getStart()
+ "-"
+ interval.getEnd()
+ "/"
+ "2011-04-6T16:52:46.119-05:00"
+ "/0/index.zip"
)
)
.version("2011-04-6T16:52:46.119-05:00")
.dimensions(ImmutableList.<String>of())
.metrics(ImmutableList.<String>of())
.shardSpec(new NoneShardSpec())
.binaryVersion(9)
.size(0)
.build();
}
catch (IOException e) {
throw new ISE(e, "Error creating segments");
}
}
}
);
mdc.setUnusedSegments(expectedUnusedSegments);
// manually create local segments files
List<File> segmentFiles = Lists.newArrayList();
for (DataSegment segment : mdc.getUnusedSegmentsForInterval("test_kill_task", new Interval("2011-04-01/P4D"))) {
File file = new File((String) segment.getLoadSpec().get("path"));
file.mkdirs();
segmentFiles.add(file);
}
final Task killTask = new KillTask(null, "test_kill_task", new Interval("2011-04-01/P4D"));
final TaskStatus status = runTask(killTask);
Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size());
Assert.assertTrue(
"expected unused segments get killed",
expectedUnusedSegments.containsAll(mdc.getNuked()) && mdc.getNuked().containsAll(
expectedUnusedSegments
)
);
for (File file : segmentFiles) {
Assert.assertFalse("unused segments files get deleted", file.exists());
}
}
@Test
@ -649,10 +788,13 @@ public class TaskLifecycleTest
}
@Test
public void testRealtimeIndexTask() throws Exception{
serverView.registerSegmentCallback(EasyMock.anyObject(Executor.class),
EasyMock.anyObject(ServerView.SegmentCallback.class),
EasyMock.anyObject(Predicate.class));
public void testRealtimeIndexTask() throws Exception
{
serverView.registerSegmentCallback(
EasyMock.anyObject(Executor.class),
EasyMock.anyObject(ServerView.SegmentCallback.class),
EasyMock.anyObject(Predicate.class)
);
EasyMock.expectLastCall().atLeastOnce();
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().atLeastOnce();
@ -661,21 +803,15 @@ public class TaskLifecycleTest
EasyMock.replay(monitorScheduler, serverView, queryRunnerFactoryConglomerate);
String taskId = "rt_task_1";
DataSchema dataSchema = new DataSchema(
"test_ds",
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null)
"test_ds",
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null)
);
DateTime now = new DateTime();
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
newMockFirehoseFactory(
ImmutableList.of(
IR(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f),
IR(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f),
IR(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f)
)
),
null // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
new MockFirehoseFactory(true),
null
// PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,
@ -708,11 +844,77 @@ public class TaskLifecycleTest
DataSegment segment = mdc.getPublished().iterator().next();
Assert.assertEquals("test_ds", segment.getDataSource());
Assert.assertEquals(ImmutableList.of("dim1", "dim2"), segment.getDimensions());
Assert.assertEquals(new Interval(now.toString("YYYY-MM-dd")+"/"+now.plusDays(1).toString("YYYY-MM-dd")), segment.getInterval());
Assert.assertEquals(
new Interval(now.toString("YYYY-MM-dd") + "/" + now.plusDays(1).toString("YYYY-MM-dd")),
segment.getInterval()
);
Assert.assertEquals(ImmutableList.of("count"), segment.getMetrics());
EasyMock.verify(monitorScheduler, serverView, queryRunnerFactoryConglomerate);
}
@Test
public void testResumeTasks() throws Exception
{
final Task indexTask = new IndexTask(
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
),
TestUtils.MAPPER
);
final long startTime = System.currentTimeMillis();
// manually insert the task into TaskStorage, waiting for TaskQueue to sync from storage
ts.insert(indexTask, TaskStatus.running(indexTask.getId()));
while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) {
if (System.currentTimeMillis() > startTime + 10 * 1000) {
throw new ISE("Where did the task go?!: %s", indexTask.getId());
}
Thread.sleep(100);
}
final TaskStatus status = ts.getStatus(indexTask.getId()).get();
final List<DataSegment> publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished());
final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
Assert.assertEquals("statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());
Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
Assert.assertEquals("segment1 datasource", "foo", publishedSegments.get(0).getDataSource());
Assert.assertEquals("segment1 interval", new Interval("2010-01-01/P1D"), publishedSegments.get(0).getInterval());
Assert.assertEquals(
"segment1 dimensions",
ImmutableList.of("dim1", "dim2"),
publishedSegments.get(0).getDimensions()
);
Assert.assertEquals("segment1 metrics", ImmutableList.of("met"), publishedSegments.get(0).getMetrics());
Assert.assertEquals("segment2 datasource", "foo", publishedSegments.get(1).getDataSource());
Assert.assertEquals("segment2 interval", new Interval("2010-01-02/P1D"), publishedSegments.get(1).getInterval());
Assert.assertEquals(
"segment2 dimensions",
ImmutableList.of("dim1", "dim2"),
publishedSegments.get(1).getDimensions()
);
Assert.assertEquals("segment2 metrics", ImmutableList.of("met"), publishedSegments.get(1).getMetrics());
}
private TaskStatus runTask(final Task task) throws Exception
{
final Task dummyTask = new DefaultObjectMapper().readValue(
@ -755,9 +957,12 @@ public class TaskLifecycleTest
final private Set<DataSegment> published = Sets.newHashSet();
final private Set<DataSegment> nuked = Sets.newHashSet();
private List<DataSegment> unusedSegments;
private MockIndexerMetadataStorageCoordinator()
{
super(null, null, null);
unusedSegments = Lists.newArrayList();
}
@Override
@ -769,7 +974,7 @@ public class TaskLifecycleTest
@Override
public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
{
return ImmutableList.of();
return unusedSegments;
}
@Override
@ -800,5 +1005,10 @@ public class TaskLifecycleTest
{
return ImmutableSet.copyOf(nuked);
}
public void setUnusedSegments(List<DataSegment> unusedSegments)
{
this.unusedSegments = unusedSegments;
}
}
}