Merge pull request #696 from metamx/add-test

Add test for MessageTimeRejectionPolicy
This commit is contained in:
fjy 2014-08-21 13:44:52 -06:00
commit 2c5792e2ef
1 changed files with 48 additions and 12 deletions

View File

@ -30,10 +30,10 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec; import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
@ -41,7 +41,6 @@ import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactory;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec;
@ -52,26 +51,57 @@ import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
*/ */
@RunWith(Parameterized.class)
public class RealtimePlumberSchoolTest public class RealtimePlumberSchoolTest
{ {
private Plumber plumber; private final RejectionPolicyFactory rejectionPolicy;
private RealtimePlumber plumber;
private DataSegmentAnnouncer announcer; private DataSegmentAnnouncer announcer;
private SegmentPublisher segmentPublisher; private SegmentPublisher segmentPublisher;
private DataSegmentPusher dataSegmentPusher; private DataSegmentPusher dataSegmentPusher;
private FilteredServerView serverView; private FilteredServerView serverView;
private ServiceEmitter emitter; private ServiceEmitter emitter;
private RealtimeTuningConfig tuningConfig;
private DataSchema schema;
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy)
{
this.rejectionPolicy = rejectionPolicy;
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
return Arrays.asList(
new Object[][]{
{
new NoopRejectionPolicyFactory()
},
{
new MessageTimeRejectionPolicyFactory()
}
}
);
}
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -80,7 +110,7 @@ public class RealtimePlumberSchoolTest
final File tmpDir = Files.createTempDir(); final File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit(); tmpDir.deleteOnExit();
final DataSchema schema = new DataSchema( schema = new DataSchema(
"test", "test",
new InputRowParser() new InputRowParser()
{ {
@ -110,9 +140,8 @@ public class RealtimePlumberSchoolTest
announcer.announceSegment(EasyMock.<DataSegment>anyObject()); announcer.announceSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();
segmentPublisher = EasyMock.createMock(SegmentPublisher.class); segmentPublisher = EasyMock.createNiceMock(SegmentPublisher.class);
dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class); dataSegmentPusher = EasyMock.createNiceMock(DataSegmentPusher.class);
serverView = EasyMock.createMock(FilteredServerView.class); serverView = EasyMock.createMock(FilteredServerView.class);
serverView.registerSegmentCallback( serverView.registerSegmentCallback(
EasyMock.<Executor>anyObject(), EasyMock.<Executor>anyObject(),
@ -125,13 +154,13 @@ public class RealtimePlumberSchoolTest
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( tuningConfig = new RealtimeTuningConfig(
1, 1,
null, null,
null, null,
null, null,
new IntervalStartVersioningPolicy(), new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(), rejectionPolicy,
null, null,
null null
); );
@ -148,12 +177,12 @@ public class RealtimePlumberSchoolTest
tmpDir, tmpDir,
Granularity.HOUR, Granularity.HOUR,
new IntervalStartVersioningPolicy(), new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(), rejectionPolicy,
null, null,
0 0
); );
plumber = realtimePlumberSchool.findPlumber(schema, tuningConfig, new FireDepartmentMetrics()); plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, new FireDepartmentMetrics());
} }
@After @After
@ -166,7 +195,13 @@ public class RealtimePlumberSchoolTest
public void testPersist() throws Exception public void testPersist() throws Exception
{ {
final MutableBoolean committed = new MutableBoolean(false); final MutableBoolean committed = new MutableBoolean(false);
plumber.getSinks().put(0L, new Sink(new Interval(0, TimeUnit.HOURS.toMillis(1)),schema, tuningConfig, DateTime.now().toString()));
plumber.startJob(); plumber.startJob();
final InputRow row = EasyMock.createNiceMock(InputRow.class);
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row);
plumber.add(row);
plumber.persist( plumber.persist(
new Runnable() new Runnable()
{ {
@ -185,6 +220,7 @@ public class RealtimePlumberSchoolTest
throw new ISE("Taking too long to set perist value"); throw new ISE("Taking too long to set perist value");
} }
} }
plumber.getSinks().clear();
plumber.finishJob(); plumber.finishJob();
} }
} }