address another node exists failure

This commit is contained in:
Bingkun Guo 2015-12-22 13:06:20 -06:00
parent 2ffeda5d25
commit 455980d659
3 changed files with 70 additions and 58 deletions

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
@ -47,8 +46,6 @@ import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.PartitionHolder;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
@ -65,8 +62,6 @@ public class BrokerServerViewTest extends CuratorTestBase
{
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPathsConfig;
private final String announcementsPath;
private final String inventoryPath;
private CountDownLatch segmentViewInitLatch;
private CountDownLatch segmentAddedLatch;
@ -79,8 +74,6 @@ public class BrokerServerViewTest extends CuratorTestBase
{
jsonMapper = new DefaultObjectMapper();
zkPathsConfig = new ZkPathsConfig();
announcementsPath = zkPathsConfig.getAnnouncementsPath();
inventoryPath = zkPathsConfig.getLiveSegmentsPath();
}
@Before
@ -111,7 +104,7 @@ public class BrokerServerViewTest extends CuratorTestBase
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
announceSegmentForServer(druidServer, segment);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
@ -137,7 +130,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertEquals(segment, selector.getSegment());
Assert.assertEquals(druidServer, selector.pick().getServer());
unannounceSegmentForServer(druidServer, segment);
unannounceSegmentForServer(druidServer, segment, zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
Assert.assertEquals(
@ -199,7 +192,7 @@ public class BrokerServerViewTest extends CuratorTestBase
);
for (int i = 0; i < 5; ++i) {
announceSegmentForServer(druidServers.get(i), segments.get(i));
announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper);
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
@ -219,7 +212,7 @@ public class BrokerServerViewTest extends CuratorTestBase
);
// unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2")
unannounceSegmentForServer(druidServers.get(2), segments.get(2));
unannounceSegmentForServer(druidServers.get(2), segments.get(2), zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
// renew segmentRemovedLatch since we still have 4 segments to unannounce
@ -244,7 +237,7 @@ public class BrokerServerViewTest extends CuratorTestBase
for (int i = 0; i < 5; ++i) {
// skip the one that was previously unannounced
if (i != 2) {
unannounceSegmentForServer(druidServers.get(i), segments.get(i));
unannounceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig);
}
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
@ -255,29 +248,6 @@ public class BrokerServerViewTest extends CuratorTestBase
);
}
private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.create()
.compressed()
.withMode(CreateMode.EPHEMERAL)
.forPath(
ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()),
jsonMapper.writeValueAsBytes(
ImmutableSet.<DataSegment>of(segment)
)
);
}
private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.delete().guaranteed().forPath(
ZKPaths.makePath(
ZKPaths.makePath(inventoryPath, druidServer.getHost()),
segment.getIdentifier()
)
);
}
private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
String intervalStr,
String version,

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
@ -38,7 +37,6 @@ import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.PartitionHolder;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
@ -54,7 +52,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase
{
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPathsConfig;
private final String announcementsPath;
private final String inventoryPath;
private CountDownLatch segmentViewInitLatch;
@ -68,7 +65,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase
{
jsonMapper = new DefaultObjectMapper();
zkPathsConfig = new ZkPathsConfig();
announcementsPath = zkPathsConfig.getAnnouncementsPath();
inventoryPath = zkPathsConfig.getLiveSegmentsPath();
}
@ -100,7 +96,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
announceSegmentForServer(druidServer, segment);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
@ -122,7 +118,10 @@ public class CoordinatorServerViewTest extends CuratorTestBase
SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject();
Assert.assertFalse(segmentLoadInfo.isEmpty());
Assert.assertEquals(druidServer.getMetadata(), Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
Assert.assertEquals(
druidServer.getMetadata(),
Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())
);
unannounceSegmentForServer(druidServer, segment);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
@ -186,7 +185,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase
);
for (int i = 0; i < 5; ++i) {
announceSegmentForServer(druidServers.get(i), segments.get(i));
announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper);
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
@ -242,19 +241,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase
);
}
private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.create()
.compressed()
.withMode(CreateMode.EPHEMERAL)
.forPath(
ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()),
jsonMapper.writeValueAsBytes(
ImmutableSet.<DataSegment>of(segment)
)
);
}
private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.delete().guaranteed().forPath(
@ -283,7 +269,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase
for (int i = 0; i < expected.size(); ++i) {
Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> expectedPair = expected.get(i);
TimelineObjectHolder<String,SegmentLoadInfo> actualTimelineObjectHolder = actual.get(i);
TimelineObjectHolder<String, SegmentLoadInfo> actualTimelineObjectHolder = actual.get(i);
Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval());
Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion());
@ -294,7 +280,8 @@ public class CoordinatorServerViewTest extends CuratorTestBase
SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject();
Assert.assertFalse(segmentLoadInfo.isEmpty());
Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),
Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
}
}

View File

@ -21,15 +21,18 @@ package io.druid.curator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.guava.CloseQuietly;
import io.druid.client.DruidServer;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
/**
@ -94,7 +97,57 @@ public class CuratorTestBase
catch (Exception e) {
Throwables.propagate(e);
}
}
protected void announceSegmentForServer(
DruidServer druidServer,
DataSegment segment,
ZkPathsConfig zkPathsConfig,
ObjectMapper jsonMapper
)
{
final String segmentAnnouncementPath = ZKPaths.makePath(ZKPaths.makePath(
zkPathsConfig.getLiveSegmentsPath(),
druidServer.getHost()
), segment.getIdentifier());
try {
curator.create()
.compressed()
.withMode(CreateMode.EPHEMERAL)
.forPath(
segmentAnnouncementPath,
jsonMapper.writeValueAsBytes(
ImmutableSet.<DataSegment>of(segment)
)
);
}
catch (KeeperException.NodeExistsException e) {
try {
curator.setData()
.forPath(
segmentAnnouncementPath,
jsonMapper.writeValueAsBytes(ImmutableSet.<DataSegment>of(segment))
);
}
catch (Exception e1) {
Throwables.propagate(e1);
}
}
catch (Exception e) {
Throwables.propagate(e);
}
}
protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment, ZkPathsConfig zkPathsConfig)
throws Exception
{
curator.delete().guaranteed().forPath(
ZKPaths.makePath(
ZKPaths.makePath(zkPathsConfig.getLiveSegmentsPath(), druidServer.getHost()),
segment.getIdentifier()
)
);
}
protected void tearDownServerAndCurator()
@ -104,3 +157,5 @@ public class CuratorTestBase
}
}
// build #13