diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java index 08b44f8de62..6298ecba363 100644 --- a/server/src/test/java/io/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileGenerator; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.Pair; @@ -47,8 +46,6 @@ import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.PartitionHolder; import io.druid.timeline.partition.SingleElementPartitionChunk; -import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.After; @@ -65,8 +62,6 @@ public class BrokerServerViewTest extends CuratorTestBase { private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPathsConfig; - private final String announcementsPath; - private final String inventoryPath; private CountDownLatch segmentViewInitLatch; private CountDownLatch segmentAddedLatch; @@ -79,8 +74,6 @@ public class BrokerServerViewTest extends CuratorTestBase { jsonMapper = new DefaultObjectMapper(); zkPathsConfig = new ZkPathsConfig(); - announcementsPath = zkPathsConfig.getAnnouncementsPath(); - inventoryPath = zkPathsConfig.getLiveSegmentsPath(); } @Before @@ -111,7 +104,7 @@ public class BrokerServerViewTest extends CuratorTestBase setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); - announceSegmentForServer(druidServer, segment); + announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); @@ -137,7 +130,7 @@ public class BrokerServerViewTest extends CuratorTestBase Assert.assertEquals(segment, selector.getSegment()); Assert.assertEquals(druidServer, selector.pick().getServer()); - unannounceSegmentForServer(druidServer, segment); + unannounceSegmentForServer(druidServer, segment, zkPathsConfig); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); Assert.assertEquals( @@ -199,7 +192,7 @@ public class BrokerServerViewTest extends CuratorTestBase ); for (int i = 0; i < 5; ++i) { - announceSegmentForServer(druidServers.get(i), segments.get(i)); + announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper); } Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); @@ -219,7 +212,7 @@ public class BrokerServerViewTest extends CuratorTestBase ); // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2") - unannounceSegmentForServer(druidServers.get(2), segments.get(2)); + unannounceSegmentForServer(druidServers.get(2), segments.get(2), zkPathsConfig); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); // renew segmentRemovedLatch since we still have 4 segments to unannounce @@ -244,7 +237,7 @@ public class BrokerServerViewTest extends CuratorTestBase for (int i = 0; i < 5; ++i) { // skip the one that was previously unannounced if (i != 2) { - unannounceSegmentForServer(druidServers.get(i), segments.get(i)); + unannounceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig); } } Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); @@ -255,29 +248,6 @@ public class BrokerServerViewTest extends CuratorTestBase ); } - private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception - { - curator.create() - .compressed() - .withMode(CreateMode.EPHEMERAL) - .forPath( - ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()), - jsonMapper.writeValueAsBytes( - ImmutableSet.of(segment) - ) - ); - } - - private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception - { - curator.delete().guaranteed().forPath( - ZKPaths.makePath( - ZKPaths.makePath(inventoryPath, druidServer.getHost()), - segment.getIdentifier() - ) - ); - } - private Pair>> createExpected( String intervalStr, String version, diff --git a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java index bcfba7453be..3e01804df09 100644 --- a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java +++ b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.Pair; @@ -38,7 +37,6 @@ import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.PartitionHolder; import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; @@ -54,7 +52,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase { private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPathsConfig; - private final String announcementsPath; private final String inventoryPath; private CountDownLatch segmentViewInitLatch; @@ -68,7 +65,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase { jsonMapper = new DefaultObjectMapper(); zkPathsConfig = new ZkPathsConfig(); - announcementsPath = zkPathsConfig.getAnnouncementsPath(); inventoryPath = zkPathsConfig.getLiveSegmentsPath(); } @@ -100,7 +96,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); - announceSegmentForServer(druidServer, segment); + announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); @@ -122,7 +118,10 @@ public class CoordinatorServerViewTest extends CuratorTestBase SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); Assert.assertFalse(segmentLoadInfo.isEmpty()); - Assert.assertEquals(druidServer.getMetadata(), Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); + Assert.assertEquals( + druidServer.getMetadata(), + Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()) + ); unannounceSegmentForServer(druidServer, segment); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); @@ -186,7 +185,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase ); for (int i = 0; i < 5; ++i) { - announceSegmentForServer(druidServers.get(i), segments.get(i)); + announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper); } Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); @@ -242,19 +241,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase ); } - private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception - { - curator.create() - .compressed() - .withMode(CreateMode.EPHEMERAL) - .forPath( - ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()), - jsonMapper.writeValueAsBytes( - ImmutableSet.of(segment) - ) - ); - } - private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception { curator.delete().guaranteed().forPath( @@ -283,7 +269,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase for (int i = 0; i < expected.size(); ++i) { Pair>> expectedPair = expected.get(i); - TimelineObjectHolder actualTimelineObjectHolder = actual.get(i); + TimelineObjectHolder actualTimelineObjectHolder = actual.get(i); Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval()); Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion()); @@ -292,9 +278,10 @@ public class CoordinatorServerViewTest extends CuratorTestBase Assert.assertTrue(actualPartitionHolder.isComplete()); Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); - SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); + SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); Assert.assertFalse(segmentLoadInfo.isEmpty()); - Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); + Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(), + Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); } } diff --git a/server/src/test/java/io/druid/curator/CuratorTestBase.java b/server/src/test/java/io/druid/curator/CuratorTestBase.java index a8aab93f650..165f0bde741 100644 --- a/server/src/test/java/io/druid/curator/CuratorTestBase.java +++ b/server/src/test/java/io/druid/curator/CuratorTestBase.java @@ -20,15 +20,20 @@ package io.druid.curator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; import com.metamx.common.guava.CloseQuietly; import io.druid.client.DruidServer; import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; /** */ @@ -54,35 +59,95 @@ public class CuratorTestBase } protected void setupZNodeForServer(DruidServer server, ZkPathsConfig zkPathsConfig, ObjectMapper jsonMapper) - throws Exception { final String announcementsPath = zkPathsConfig.getAnnouncementsPath(); final String inventoryPath = zkPathsConfig.getLiveSegmentsPath(); - final String zNodePathAnnounce = ZKPaths.makePath(announcementsPath, server.getHost()); - final String zNodePathSegment = ZKPaths.makePath(inventoryPath, server.getHost()); - - /* - * Explicitly check whether the zNodes we are about to create exist or not, - * if exist, delete them to make sure we have a clean state on zookeeper. - * Address issue: https://github.com/druid-io/druid/issues/1512 - */ - if (curator.checkExists().forPath(zNodePathAnnounce) != null) { - curator.delete().guaranteed().forPath(zNodePathAnnounce); + try { + curator.create() + .creatingParentsIfNeeded() + .forPath( + ZKPaths.makePath(announcementsPath, server.getHost()), + jsonMapper.writeValueAsBytes(server.getMetadata()) + ); + curator.create() + .creatingParentsIfNeeded() + .forPath(ZKPaths.makePath(inventoryPath, server.getHost())); } - if (curator.checkExists().forPath(zNodePathSegment) != null) { - curator.delete().guaranteed().forPath(zNodePathSegment); + catch (KeeperException.NodeExistsException e) { + /* + * For some reason, Travis build sometimes fails here because of + * org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists, though it should never + * happen because zookeeper should be in a clean state for each run of tests. + * Address issue: https://github.com/druid-io/druid/issues/1512 + */ + try { + curator.setData() + .forPath( + ZKPaths.makePath(announcementsPath, server.getHost()), + jsonMapper.writeValueAsBytes(server.getMetadata()) + ); + curator.setData() + .forPath(ZKPaths.makePath(inventoryPath, server.getHost())); + } + catch (Exception e1) { + Throwables.propagate(e1); + } } + catch (Exception e) { + Throwables.propagate(e); + } + } - curator.create() - .creatingParentsIfNeeded() - .forPath( - ZKPaths.makePath(announcementsPath, server.getHost()), - jsonMapper.writeValueAsBytes(server.getMetadata()) - ); - curator.create() - .creatingParentsIfNeeded() - .forPath(ZKPaths.makePath(inventoryPath, server.getHost())); + protected void announceSegmentForServer( + DruidServer druidServer, + DataSegment segment, + ZkPathsConfig zkPathsConfig, + ObjectMapper jsonMapper + ) + { + final String segmentAnnouncementPath = ZKPaths.makePath(ZKPaths.makePath( + zkPathsConfig.getLiveSegmentsPath(), + druidServer.getHost() + ), segment.getIdentifier()); + + try { + curator.create() + .compressed() + .withMode(CreateMode.EPHEMERAL) + .forPath( + segmentAnnouncementPath, + jsonMapper.writeValueAsBytes( + ImmutableSet.of(segment) + ) + ); + } + catch (KeeperException.NodeExistsException e) { + try { + curator.setData() + .forPath( + segmentAnnouncementPath, + jsonMapper.writeValueAsBytes(ImmutableSet.of(segment)) + ); + } + catch (Exception e1) { + Throwables.propagate(e1); + } + } + catch (Exception e) { + Throwables.propagate(e); + } + } + + protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment, ZkPathsConfig zkPathsConfig) + throws Exception + { + curator.delete().guaranteed().forPath( + ZKPaths.makePath( + ZKPaths.makePath(zkPathsConfig.getLiveSegmentsPath(), druidServer.getHost()), + segment.getIdentifier() + ) + ); } protected void tearDownServerAndCurator() @@ -90,4 +155,7 @@ public class CuratorTestBase CloseQuietly.close(curator); CloseQuietly.close(server); } + } + +//Build at Tue Dec 22 21:30:00 CST 2015