mirror of https://github.com/apache/druid.git
Merge pull request #2031 from liuqiyun/1914-v2
Druid Issue #1914: Added an injection test for ZkCoordinator.
This commit is contained in:
commit
b9760e1219
|
@ -17,14 +17,6 @@
|
|||
|
||||
package io.druid.server.coordination;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.LocalCacheProvider;
|
||||
import io.druid.concurrent.Execs;
|
||||
|
@ -39,11 +31,6 @@ import io.druid.server.initialization.ZkPathsConfig;
|
|||
import io.druid.server.metrics.NoopServiceEmitter;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -52,6 +39,27 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ZkCoordinatorTest extends CuratorTestBase
|
||||
|
@ -283,4 +291,93 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
Assert.assertEquals(segments.get(i), segment);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInjector() throws Exception
|
||||
{
|
||||
Injector injector = Guice.createInjector(
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
binder.bind(ObjectMapper.class).toInstance(jsonMapper);
|
||||
binder.bind(SegmentLoaderConfig.class).toInstance(
|
||||
new SegmentLoaderConfig()
|
||||
{
|
||||
@Override
|
||||
public File getInfoDir()
|
||||
{
|
||||
return infoDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumLoadingThreads()
|
||||
{
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getAnnounceIntervalMillis()
|
||||
{
|
||||
return 50;
|
||||
}
|
||||
}
|
||||
);
|
||||
binder.bind(ZkPathsConfig.class).toInstance(
|
||||
new ZkPathsConfig()
|
||||
{
|
||||
@Override
|
||||
public String getBase()
|
||||
{
|
||||
return "/druid";
|
||||
}
|
||||
}
|
||||
);
|
||||
binder.bind(DruidServerMetadata.class)
|
||||
.toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0));
|
||||
binder.bind(DataSegmentAnnouncer.class).toInstance(announcer);
|
||||
binder.bind(CuratorFramework.class).toInstance(curator);
|
||||
binder.bind(ServerManager.class).toInstance(serverManager);
|
||||
binder.bind(ScheduledExecutorFactory.class).toInstance(ScheduledExecutors.createFactory(new Lifecycle()));
|
||||
}
|
||||
|
||||
}
|
||||
);
|
||||
|
||||
ZkCoordinator zkCoordinator = injector.getInstance(ZkCoordinator.class);
|
||||
|
||||
List<DataSegment> segments = Lists.newLinkedList();
|
||||
for (int i = 0; i < COUNT; ++i) {
|
||||
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01")));
|
||||
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02")));
|
||||
segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02")));
|
||||
segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-01")));
|
||||
segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-02")));
|
||||
}
|
||||
Collections.sort(segments);
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
writeSegmentToCache(segment);
|
||||
}
|
||||
|
||||
checkCache(segments);
|
||||
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
|
||||
|
||||
zkCoordinator.start();
|
||||
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
|
||||
for (int i = 0; i < COUNT; ++i) {
|
||||
Assert.assertEquals(3L, serverManager.getDataSourceCounts().get("test" + i).longValue());
|
||||
Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
|
||||
}
|
||||
Assert.assertEquals(5 * COUNT, announceCount.get());
|
||||
zkCoordinator.stop();
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
deleteSegmentFromCache(segment);
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, infoDir.listFiles().length);
|
||||
Assert.assertTrue(infoDir.delete());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue