mirror of https://github.com/apache/druid.git
filtered batch inventory view tests
This commit is contained in:
parent
f30d58ad31
commit
d6f58bdced
|
@ -20,7 +20,8 @@
|
|||
package io.druid.client.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
|
@ -28,6 +29,7 @@ import com.google.common.util.concurrent.MoreExecutors;
|
|||
import com.metamx.common.ISE;
|
||||
import io.druid.client.BatchServerInventoryView;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.curator.PotentiallyGzippedCompressionProvider;
|
||||
import io.druid.curator.announcement.Announcer;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
|
@ -36,17 +38,23 @@ import io.druid.server.coordination.DruidServerMetadata;
|
|||
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import junit.framework.Assert;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.apache.curator.test.TestingCluster;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.LogicalOperator;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Comparator;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -55,6 +63,8 @@ import java.util.concurrent.TimeUnit;
|
|||
public class BatchServerInventoryViewTest
|
||||
{
|
||||
private static final String testBasePath = "/test";
|
||||
public static final DateTime SEGMENT_INTERVAL_START = new DateTime("2013-01-01");
|
||||
public static final int INITIAL_SEGMENTS = 100;
|
||||
|
||||
private TestingCluster testingCluster;
|
||||
private CuratorFramework cf;
|
||||
|
@ -63,6 +73,10 @@ public class BatchServerInventoryViewTest
|
|||
private BatchDataSegmentAnnouncer segmentAnnouncer;
|
||||
private Set<DataSegment> testSegments;
|
||||
private BatchServerInventoryView batchServerInventoryView;
|
||||
private BatchServerInventoryView filteredBatchServerInventoryView;
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
|
@ -117,7 +131,7 @@ public class BatchServerInventoryViewTest
|
|||
segmentAnnouncer.start();
|
||||
|
||||
testSegments = Sets.newHashSet();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
for (int i = 0; i < INITIAL_SEGMENTS; i++) {
|
||||
testSegments.add(makeSegment(i));
|
||||
}
|
||||
|
||||
|
@ -136,12 +150,36 @@ public class BatchServerInventoryViewTest
|
|||
);
|
||||
|
||||
batchServerInventoryView.start();
|
||||
|
||||
filteredBatchServerInventoryView = new BatchServerInventoryView(
|
||||
new ZkPathsConfig()
|
||||
{
|
||||
@Override
|
||||
public String getZkBasePath()
|
||||
{
|
||||
return testBasePath;
|
||||
}
|
||||
},
|
||||
cf,
|
||||
jsonMapper,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment dataSegment)
|
||||
{
|
||||
return dataSegment.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
filteredBatchServerInventoryView.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
batchServerInventoryView.stop();
|
||||
filteredBatchServerInventoryView.stop();
|
||||
segmentAnnouncer.stop();
|
||||
announcer.stop();
|
||||
cf.close();
|
||||
|
@ -153,7 +191,7 @@ public class BatchServerInventoryViewTest
|
|||
{
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
waitForSync();
|
||||
waitForSync(batchServerInventoryView, testSegments);
|
||||
|
||||
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
|
||||
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
@ -168,7 +206,7 @@ public class BatchServerInventoryViewTest
|
|||
testSegments.add(segment1);
|
||||
testSegments.add(segment2);
|
||||
|
||||
waitForSync();
|
||||
waitForSync(batchServerInventoryView, testSegments);
|
||||
|
||||
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
|
||||
|
||||
|
@ -177,32 +215,131 @@ public class BatchServerInventoryViewTest
|
|||
testSegments.remove(segment1);
|
||||
testSegments.remove(segment2);
|
||||
|
||||
waitForSync();
|
||||
waitForSync(batchServerInventoryView, testSegments);
|
||||
|
||||
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWithFilter() throws Exception
|
||||
{
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
|
||||
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
|
||||
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
||||
Assert.assertEquals(testSegments, segments);
|
||||
|
||||
// segment outside the range of default filter
|
||||
DataSegment segment1 = makeSegment(101);
|
||||
segmentAnnouncer.announceSegment(segment1);
|
||||
testSegments.add(segment1);
|
||||
|
||||
exception.expect(ISE.class);
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWithFilterCallback() throws Exception
|
||||
{
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
|
||||
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
|
||||
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
||||
Assert.assertEquals(testSegments, segments);
|
||||
|
||||
ServerView.SegmentCallback callback = EasyMock.createStrictMock(ServerView.SegmentCallback.class);
|
||||
Comparator<DataSegment> dataSegmentComparator = new Comparator<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public int compare(DataSegment o1, DataSegment o2)
|
||||
{
|
||||
return o1.getInterval().equals(o2.getInterval()) ? 0 : -1;
|
||||
}
|
||||
};
|
||||
|
||||
EasyMock
|
||||
.expect(
|
||||
callback.segmentAdded(
|
||||
EasyMock.<DruidServerMetadata>anyObject(),
|
||||
EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL)
|
||||
)
|
||||
)
|
||||
.andReturn(ServerView.CallbackAction.CONTINUE)
|
||||
.times(1);
|
||||
|
||||
EasyMock
|
||||
.expect(
|
||||
callback.segmentRemoved(
|
||||
EasyMock.<DruidServerMetadata>anyObject(),
|
||||
EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL)
|
||||
)
|
||||
)
|
||||
.andReturn(ServerView.CallbackAction.CONTINUE)
|
||||
.times(1);
|
||||
|
||||
|
||||
EasyMock.replay(callback);
|
||||
|
||||
filteredBatchServerInventoryView.registerSegmentCallback(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
callback,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment dataSegment)
|
||||
{
|
||||
return dataSegment.getInterval().getStart().equals(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS + 2));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
DataSegment segment2 = makeSegment(INITIAL_SEGMENTS + 2);
|
||||
segmentAnnouncer.announceSegment(segment2);
|
||||
testSegments.add(segment2);
|
||||
|
||||
DataSegment oldSegment = makeSegment(-1);
|
||||
segmentAnnouncer.announceSegment(oldSegment);
|
||||
testSegments.add(oldSegment);
|
||||
|
||||
segmentAnnouncer.unannounceSegment(oldSegment);
|
||||
testSegments.remove(oldSegment);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
|
||||
segmentAnnouncer.unannounceSegment(segment2);
|
||||
testSegments.remove(segment2);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
EasyMock.verify(callback);
|
||||
}
|
||||
|
||||
private DataSegment makeSegment(int offset)
|
||||
{
|
||||
return DataSegment.builder()
|
||||
.dataSource("foo")
|
||||
.interval(
|
||||
new Interval(
|
||||
new DateTime("2013-01-01").plusDays(offset),
|
||||
new DateTime("2013-01-02").plusDays(offset)
|
||||
SEGMENT_INTERVAL_START.plusDays(offset),
|
||||
SEGMENT_INTERVAL_START.plusDays(offset + 1)
|
||||
)
|
||||
)
|
||||
.version(new DateTime().toString())
|
||||
.build();
|
||||
}
|
||||
|
||||
private void waitForSync() throws Exception
|
||||
private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments) throws Exception
|
||||
{
|
||||
Stopwatch stopwatch = new Stopwatch().start();
|
||||
while (Iterables.isEmpty(batchServerInventoryView.getInventory())
|
||||
|| Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) {
|
||||
Thread.sleep(500);
|
||||
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 5000) {
|
||||
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 2000) {
|
||||
throw new ISE("BatchServerInventoryView is not updating");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue