diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java index 08b44f8de62..6298ecba363 100644 --- a/server/src/test/java/io/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileGenerator; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.Pair; @@ -47,8 +46,6 @@ import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.PartitionHolder; import io.druid.timeline.partition.SingleElementPartitionChunk; -import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.After; @@ -65,8 +62,6 @@ public class BrokerServerViewTest extends CuratorTestBase { private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPathsConfig; - private final String announcementsPath; - private final String inventoryPath; private CountDownLatch segmentViewInitLatch; private CountDownLatch segmentAddedLatch; @@ -79,8 +74,6 @@ public class BrokerServerViewTest extends CuratorTestBase { jsonMapper = new DefaultObjectMapper(); zkPathsConfig = new ZkPathsConfig(); - announcementsPath = zkPathsConfig.getAnnouncementsPath(); - inventoryPath = zkPathsConfig.getLiveSegmentsPath(); } @Before @@ -111,7 +104,7 @@ public class BrokerServerViewTest extends CuratorTestBase setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); - announceSegmentForServer(druidServer, segment); + announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); @@ -137,7 +130,7 @@ public class BrokerServerViewTest extends CuratorTestBase Assert.assertEquals(segment, selector.getSegment()); Assert.assertEquals(druidServer, selector.pick().getServer()); - unannounceSegmentForServer(druidServer, segment); + unannounceSegmentForServer(druidServer, segment, zkPathsConfig); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); Assert.assertEquals( @@ -199,7 +192,7 @@ public class BrokerServerViewTest extends CuratorTestBase ); for (int i = 0; i < 5; ++i) { - announceSegmentForServer(druidServers.get(i), segments.get(i)); + announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper); } Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); @@ -219,7 +212,7 @@ public class BrokerServerViewTest extends CuratorTestBase ); // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2") - unannounceSegmentForServer(druidServers.get(2), segments.get(2)); + unannounceSegmentForServer(druidServers.get(2), segments.get(2), zkPathsConfig); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); // renew segmentRemovedLatch since we still have 4 segments to unannounce @@ -244,7 +237,7 @@ public class BrokerServerViewTest extends CuratorTestBase for (int i = 0; i < 5; ++i) { // skip the one that was previously unannounced if (i != 2) { - unannounceSegmentForServer(druidServers.get(i), segments.get(i)); + unannounceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig); } } Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); @@ -255,29 +248,6 @@ public class BrokerServerViewTest extends CuratorTestBase ); } - private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception - { - curator.create() - .compressed() - .withMode(CreateMode.EPHEMERAL) - .forPath( - ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()), - jsonMapper.writeValueAsBytes( - ImmutableSet.of(segment) - ) - ); - } - - private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception - { - curator.delete().guaranteed().forPath( - ZKPaths.makePath( - ZKPaths.makePath(inventoryPath, druidServer.getHost()), - segment.getIdentifier() - ) - ); - } - private Pair>> createExpected( String intervalStr, String version, diff --git a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java index bcfba7453be..3e01804df09 100644 --- a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java +++ b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.Pair; @@ -38,7 +37,6 @@ import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.PartitionHolder; import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; @@ -54,7 +52,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase { private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPathsConfig; - private final String announcementsPath; private final String inventoryPath; private CountDownLatch segmentViewInitLatch; @@ -68,7 +65,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase { jsonMapper = new DefaultObjectMapper(); zkPathsConfig = new ZkPathsConfig(); - announcementsPath = zkPathsConfig.getAnnouncementsPath(); inventoryPath = zkPathsConfig.getLiveSegmentsPath(); } @@ -100,7 +96,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); - announceSegmentForServer(druidServer, segment); + announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); @@ -122,7 +118,10 @@ public class CoordinatorServerViewTest extends CuratorTestBase SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); Assert.assertFalse(segmentLoadInfo.isEmpty()); - Assert.assertEquals(druidServer.getMetadata(), Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); + Assert.assertEquals( + druidServer.getMetadata(), + Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()) + ); unannounceSegmentForServer(druidServer, segment); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); @@ -186,7 +185,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase ); for (int i = 0; i < 5; ++i) { - announceSegmentForServer(druidServers.get(i), segments.get(i)); + announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper); } Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); @@ -242,19 +241,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase ); } - private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception - { - curator.create() - .compressed() - .withMode(CreateMode.EPHEMERAL) - .forPath( - ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()), - jsonMapper.writeValueAsBytes( - ImmutableSet.of(segment) - ) - ); - } - private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception { curator.delete().guaranteed().forPath( @@ -283,7 +269,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase for (int i = 0; i < expected.size(); ++i) { Pair>> expectedPair = expected.get(i); - TimelineObjectHolder actualTimelineObjectHolder = actual.get(i); + TimelineObjectHolder actualTimelineObjectHolder = actual.get(i); Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval()); Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion()); @@ -292,9 +278,10 @@ public class CoordinatorServerViewTest extends CuratorTestBase Assert.assertTrue(actualPartitionHolder.isComplete()); Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); - SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); + SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); Assert.assertFalse(segmentLoadInfo.isEmpty()); - Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); + Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(), + Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); } } diff --git a/server/src/test/java/io/druid/curator/CuratorTestBase.java b/server/src/test/java/io/druid/curator/CuratorTestBase.java index cc15daeb0e1..5d8b35a3e1a 100644 --- a/server/src/test/java/io/druid/curator/CuratorTestBase.java +++ b/server/src/test/java/io/druid/curator/CuratorTestBase.java @@ -21,15 +21,18 @@ package io.druid.curator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; import com.metamx.common.guava.CloseQuietly; import io.druid.client.DruidServer; import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; /** @@ -94,7 +97,57 @@ public class CuratorTestBase catch (Exception e) { Throwables.propagate(e); } + } + protected void announceSegmentForServer( + DruidServer druidServer, + DataSegment segment, + ZkPathsConfig zkPathsConfig, + ObjectMapper jsonMapper + ) + { + final String segmentAnnouncementPath = ZKPaths.makePath(ZKPaths.makePath( + zkPathsConfig.getLiveSegmentsPath(), + druidServer.getHost() + ), segment.getIdentifier()); + + try { + curator.create() + .compressed() + .withMode(CreateMode.EPHEMERAL) + .forPath( + segmentAnnouncementPath, + jsonMapper.writeValueAsBytes( + ImmutableSet.of(segment) + ) + ); + } + catch (KeeperException.NodeExistsException e) { + try { + curator.setData() + .forPath( + segmentAnnouncementPath, + jsonMapper.writeValueAsBytes(ImmutableSet.of(segment)) + ); + } + catch (Exception e1) { + Throwables.propagate(e1); + } + } + catch (Exception e) { + Throwables.propagate(e); + } + } + + protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment, ZkPathsConfig zkPathsConfig) + throws Exception + { + curator.delete().guaranteed().forPath( + ZKPaths.makePath( + ZKPaths.makePath(zkPathsConfig.getLiveSegmentsPath(), druidServer.getHost()), + segment.getIdentifier() + ) + ); } protected void tearDownServerAndCurator() @@ -104,3 +157,5 @@ public class CuratorTestBase } } + +// build #13 \ No newline at end of file