add test for parallel loading

This commit is contained in:
Xavier Léauté 2014-09-23 16:05:32 -07:00
parent 05d4f71ddc
commit 35fb210cfa
1 changed files with 73 additions and 18 deletions

View File

@ -21,6 +21,7 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -50,18 +51,22 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
public class ZkCoordinatorTest extends CuratorTestBase public class ZkCoordinatorTest extends CuratorTestBase
{ {
private static final Logger log = new Logger(ZkCoordinatorTest.class); private static final Logger log = new Logger(ZkCoordinatorTest.class);
public static final int COUNT = 50;
private final ObjectMapper jsonMapper = new DefaultObjectMapper(); private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private ZkCoordinator zkCoordinator; private ZkCoordinator zkCoordinator;
private ServerManager serverManager; private ServerManager serverManager;
private DataSegmentAnnouncer announcer; private DataSegmentAnnouncer announcer;
private File infoDir; private File infoDir;
private AtomicInteger announceCount;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -101,10 +106,42 @@ public class ZkCoordinatorTest extends CuratorTestBase
} }
}; };
announcer = new SingleDataSegmentAnnouncer( announceCount = new AtomicInteger(0);
announcer = new DataSegmentAnnouncer()
{
private final DataSegmentAnnouncer delegate = new SingleDataSegmentAnnouncer(
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
); );
@Override
public void announceSegment(DataSegment segment) throws IOException
{
announceCount.incrementAndGet();
delegate.announceSegment(segment);
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
announceCount.decrementAndGet();
delegate.unannounceSegment(segment);
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
announceCount.addAndGet(Iterables.size(segments));
delegate.announceSegments(segments);
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
announceCount.addAndGet(-Iterables.size(segments));
delegate.unannounceSegments(segments);
}
};
zkCoordinator = new ZkCoordinator( zkCoordinator = new ZkCoordinator(
jsonMapper, jsonMapper,
new SegmentLoaderConfig() new SegmentLoaderConfig()
@ -114,6 +151,18 @@ public class ZkCoordinatorTest extends CuratorTestBase
{ {
return infoDir; return infoDir;
} }
@Override
public int getNumLoadingThreads()
{
return 5;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
}, },
zkPaths, zkPaths,
me, me,
@ -133,21 +182,22 @@ public class ZkCoordinatorTest extends CuratorTestBase
@Test @Test
public void testLoadCache() throws Exception public void testLoadCache() throws Exception
{ {
List<DataSegment> segments = Lists.newArrayList( List<DataSegment> segments = Lists.newLinkedList();
makeSegment("test", "1", new Interval("P1d/2011-04-01")), for(int i = 0; i < COUNT; ++i) {
makeSegment("test", "1", new Interval("P1d/2011-04-02")), segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01")));
makeSegment("test", "2", new Interval("P1d/2011-04-02")), segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02")));
makeSegment("test", "1", new Interval("P1d/2011-04-03")), segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02")));
makeSegment("test", "1", new Interval("P1d/2011-04-04")), segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-03")));
makeSegment("test", "1", new Interval("P1d/2011-04-05")), segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-04")));
makeSegment("test", "2", new Interval("PT1h/2011-04-04T01")), segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-05")));
makeSegment("test", "2", new Interval("PT1h/2011-04-04T02")), segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T01")));
makeSegment("test", "2", new Interval("PT1h/2011-04-04T03")), segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T02")));
makeSegment("test", "2", new Interval("PT1h/2011-04-04T05")), segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T03")));
makeSegment("test", "2", new Interval("PT1h/2011-04-04T06")), segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T05")));
makeSegment("test2", "1", new Interval("P1d/2011-04-01")), segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T06")));
makeSegment("test2", "1", new Interval("P1d/2011-04-02")) segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-01")));
); segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-02")));
}
Collections.sort(segments); Collections.sort(segments);
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
@ -158,6 +208,11 @@ public class ZkCoordinatorTest extends CuratorTestBase
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty()); Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
zkCoordinator.start(); zkCoordinator.start();
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty()); Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
for(int i = 0; i < COUNT; ++i) {
Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(13 * COUNT, announceCount.get());
zkCoordinator.stop(); zkCoordinator.stop();
for (DataSegment segment : segments) { for (DataSegment segment : segments) {