fix broken unit tests are a result of the last merge

This commit is contained in:
fjy 2013-09-23 12:56:01 -07:00
parent 98c663e75c
commit dc8a119787
4 changed files with 134 additions and 32 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
@ -41,6 +42,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
@ -61,8 +63,13 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock;
@ -139,7 +146,23 @@ public class TaskLifecycleTest
null, // query runner factory conglomerate corporation unionized collective
null, // query executor service
null, // monitor scheduler
null, // segment loader
new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
new LocalDataSegmentPuller()
),
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}
)
),
new DefaultObjectMapper()
);
@ -157,7 +180,8 @@ public class TaskLifecycleTest
{
try {
FileUtils.deleteDirectory(tmp);
} catch(Exception e) {
}
catch (Exception e) {
// suppress
}
tc.stop();
@ -202,12 +226,20 @@ public class TaskLifecycleTest
Assert.assertEquals("segment1 datasource", "foo", publishedSegments.get(0).getDataSource());
Assert.assertEquals("segment1 interval", new Interval("2010-01-01/P1D"), publishedSegments.get(0).getInterval());
Assert.assertEquals("segment1 dimensions", ImmutableList.of("dim1", "dim2"), publishedSegments.get(0).getDimensions());
Assert.assertEquals(
"segment1 dimensions",
ImmutableList.of("dim1", "dim2"),
publishedSegments.get(0).getDimensions()
);
Assert.assertEquals("segment1 metrics", ImmutableList.of("met"), publishedSegments.get(0).getMetrics());
Assert.assertEquals("segment2 datasource", "foo", publishedSegments.get(1).getDataSource());
Assert.assertEquals("segment2 interval", new Interval("2010-01-02/P1D"), publishedSegments.get(1).getInterval());
Assert.assertEquals("segment2 dimensions", ImmutableList.of("dim1", "dim2"), publishedSegments.get(1).getDimensions());
Assert.assertEquals(
"segment2 dimensions",
ImmutableList.of("dim1", "dim2"),
publishedSegments.get(1).getDimensions()
);
Assert.assertEquals("segment2 metrics", ImmutableList.of("met"), publishedSegments.get(1).getMetrics());
}
@ -373,8 +405,8 @@ public class TaskLifecycleTest
TaskStatus status;
try {
while ( (status = tsqa.getSameGroupMergedStatus(task.getId()).get()).isRunnable()) {
if(System.currentTimeMillis() > startTime + 10 * 1000) {
while ((status = tsqa.getSameGroupMergedStatus(task.getId()).get()).isRunnable()) {
if (System.currentTimeMillis() > startTime + 10 * 1000) {
throw new ISE("Where did the task go?!: %s", task.getId());
}
@ -414,8 +446,8 @@ public class TaskLifecycleTest
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
{
Set<DataSegment> added = Sets.newHashSet();
for(final DataSegment segment : segments) {
if(published.add(segment)) {
for (final DataSegment segment : segments) {
if (published.add(segment)) {
added.add(segment);
}
}
@ -500,7 +532,8 @@ public class TaskLifecycleTest
@Override
public Runnable commit()
{
return new Runnable() {
return new Runnable()
{
@Override
public void run()
{

View File

@ -20,9 +20,11 @@
package io.druid.indexing.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
@ -30,8 +32,14 @@ import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.SpawnTasksAction;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.Task;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -81,18 +89,23 @@ public class TaskQueueTest
Throwable thrown;
for(Task task : tasks) {
for (Task task : tasks) {
tq.add(task);
}
// get task status for in-progress task
Assert.assertEquals("T2 status (before finishing)", TaskStatus.Status.RUNNING, ts.getStatus(tasks[2].getId()).get().getStatusCode());
Assert.assertEquals(
"T2 status (before finishing)",
TaskStatus.Status.RUNNING,
ts.getStatus(tasks[2].getId()).get().getStatusCode()
);
// Can't add tasks with the same id
thrown = null;
try {
tq.add(newTask("T5", "G5", "baz", new Interval("2013-02-01/PT1H")));
} catch(TaskExistsException e) {
}
catch (TaskExistsException e) {
thrown = e;
}
@ -102,7 +115,7 @@ public class TaskQueueTest
final List<Task> taken = Lists.newArrayList();
while (true) {
final Task task = tq.poll();
if(task != null) {
if (task != null) {
taken.add(task);
} else {
break;
@ -132,7 +145,7 @@ public class TaskQueueTest
taken.clear();
while (true) {
final Task task = tq.poll();
if(task != null) {
if (task != null) {
taken.add(task);
} else {
break;
@ -159,7 +172,7 @@ public class TaskQueueTest
final TaskLockbox tl = new TaskLockbox(ts);
final TaskQueue tq = newTaskQueue(ts, tl);
final TaskToolboxFactory tb = new TaskToolboxFactory(
null,
new TaskConfig(null, null, null, null),
new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, null, null)),
null,
null,
@ -169,7 +182,23 @@ public class TaskQueueTest
null,
null,
null,
null,
new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
new LocalDataSegmentPuller()
),
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}
)
),
null
);
@ -177,8 +206,8 @@ public class TaskQueueTest
final Task t1 = newContinuedTask("T1", "G1", "bar", new Interval("2013/P1Y"), Lists.newArrayList(t0));
tq.add(t1);
Assert.assertTrue("T0 isPresent (#1)", !ts.getStatus("T0").isPresent());
Assert.assertTrue("T1 isPresent (#1)", ts.getStatus("T1").isPresent());
Assert.assertTrue("T0 isPresent (#1)", !ts.getStatus("T0").isPresent());
Assert.assertTrue("T1 isPresent (#1)", ts.getStatus("T1").isPresent());
Assert.assertTrue("T1 isRunnable (#1)", ts.getStatus("T1").get().isRunnable());
Assert.assertTrue("T1 isComplete (#1)", !ts.getStatus("T1").get().isComplete());
@ -190,11 +219,11 @@ public class TaskQueueTest
tq.notify(t1, t1.run(tb.build(t1)));
Assert.assertTrue("T0 isPresent (#2)", ts.getStatus("T0").isPresent());
Assert.assertTrue("T0 isRunnable (#2)", ts.getStatus("T0").get().isRunnable());
Assert.assertTrue("T0 isRunnable (#2)", ts.getStatus("T0").get().isRunnable());
Assert.assertTrue("T0 isComplete (#2)", !ts.getStatus("T0").get().isComplete());
Assert.assertTrue("T1 isPresent (#2)", ts.getStatus("T1").isPresent());
Assert.assertTrue("T1 isPresent (#2)", ts.getStatus("T1").isPresent());
Assert.assertTrue("T1 isRunnable (#2)", !ts.getStatus("T1").get().isRunnable());
Assert.assertTrue("T1 isComplete (#2)", ts.getStatus("T1").get().isComplete());
Assert.assertTrue("T1 isComplete (#2)", ts.getStatus("T1").get().isComplete());
// should be able to get t0 out
Assert.assertEquals("poll #3", "T0", tq.poll().getId());
@ -205,10 +234,10 @@ public class TaskQueueTest
Assert.assertTrue("T0 isPresent (#3)", ts.getStatus("T0").isPresent());
Assert.assertTrue("T0 isRunnable (#3)", !ts.getStatus("T0").get().isRunnable());
Assert.assertTrue("T0 isComplete (#3)", ts.getStatus("T0").get().isComplete());
Assert.assertTrue("T1 isPresent (#3)", ts.getStatus("T1").isPresent());
Assert.assertTrue("T0 isComplete (#3)", ts.getStatus("T0").get().isComplete());
Assert.assertTrue("T1 isPresent (#3)", ts.getStatus("T1").isPresent());
Assert.assertTrue("T1 isRunnable (#3)", !ts.getStatus("T1").get().isRunnable());
Assert.assertTrue("T1 isComplete (#3)", ts.getStatus("T1").get().isComplete());
Assert.assertTrue("T1 isComplete (#3)", ts.getStatus("T1").get().isComplete());
// should be no more events available for polling
Assert.assertNull("poll #5", tq.poll());
@ -221,7 +250,7 @@ public class TaskQueueTest
final TaskLockbox tl = new TaskLockbox(ts);
final TaskQueue tq = newTaskQueue(ts, tl);
final TaskToolboxFactory tb = new TaskToolboxFactory(
null,
new TaskConfig(null, null, null, null),
new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, null, null)),
null,
null,
@ -231,7 +260,23 @@ public class TaskQueueTest
null,
null,
null,
null,
new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
new LocalDataSegmentPuller()
),
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}
)
),
null
);
@ -270,7 +315,7 @@ public class TaskQueueTest
final Task task = tq.poll();
if(task != null) {
if (task != null) {
final TaskLock taskLock = Iterables.getOnlyElement(tl.findLocksForTask(task));
Assert.assertEquals(
String.format("%s version", task.getId()),

View File

@ -22,9 +22,12 @@ package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestMergeTask;
@ -35,6 +38,11 @@ import io.druid.indexing.coordinator.TestRemoteTaskRunnerConfig;
import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
@ -46,6 +54,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.List;
/**
*/
@ -79,8 +88,6 @@ public class WorkerTaskMonitorTest
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath);
//cf.create().creatingParentsIfNeeded().forPath(tasksPath);
//cf.create().creatingParentsIfNeeded().forPath(statusPath);
worker = new Worker(
"worker",
@ -115,7 +122,23 @@ public class WorkerTaskMonitorTest
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, null, jsonMapper
null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
new LocalDataSegmentPuller()
),
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}
)
), jsonMapper
)
),
new WorkerConfig().setCapacity(1)
@ -180,7 +203,7 @@ public class WorkerTaskMonitorTest
);
TaskAnnouncement taskAnnouncement = jsonMapper.readValue(
cf.getData().forPath(joiner.join(statusPath, task.getId())), TaskAnnouncement.class
cf.getData().forPath(joiner.join(statusPath, task.getId())), TaskAnnouncement.class
);
Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId());

View File

@ -81,7 +81,8 @@ import java.util.Set;
/**
*/
public class Initialization
public class
Initialization
{
private static final Logger log = new Logger(Initialization.class);
private static final Map<String, ClassLoader> loadersMap = Maps.newHashMap();