From 79b1443cc3f79f29d9ba7b9f54f495539e3a20b9 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 17 Mar 2015 14:22:12 -0700 Subject: [PATCH] Change announcement ID to a UUID instead of Timestamp * Also add a UUIDUtils to the common package --- .../java/io/druid/common/utils/UUIDUtils.java | 57 +++++++++ .../io/druid/common/utils/UUIDUtilsTest.java | 104 +++++++++++++++++ .../BatchDataSegmentAnnouncer.java | 14 ++- .../client/BatchServerInventoryViewTest.java | 110 +++++++++++++++++- 4 files changed, 279 insertions(+), 6 deletions(-) create mode 100644 common/src/main/java/io/druid/common/utils/UUIDUtils.java create mode 100644 common/src/test/java/io/druid/common/utils/UUIDUtilsTest.java diff --git a/common/src/main/java/io/druid/common/utils/UUIDUtils.java b/common/src/main/java/io/druid/common/utils/UUIDUtils.java new file mode 100644 index 00000000000..a8412f07e37 --- /dev/null +++ b/common/src/main/java/io/druid/common/utils/UUIDUtils.java @@ -0,0 +1,57 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.common.utils; + +import com.google.common.base.Joiner; +import com.google.common.base.Strings; + +import java.util.ArrayList; +import java.util.UUID; + +/** + * + */ +public class UUIDUtils +{ + public static final String UUID_DELIM = "-"; + + /** + * Generates a universally unique identifier. + * + * @param extraData Extra data which often takes the form of debugging information + * + * @return A string which is a universally unique id (as determined by java.util.UUID) with extra data. It does not conform to a UUID variant standard. + */ + public static String generateUuid(String... extraData) + { + String extra = null; + if (extraData != null && extraData.length > 0) { + final ArrayList extraStrings = new ArrayList<>(extraData.length); + for (String extraString : extraData) { + if (!Strings.isNullOrEmpty(extraString)) { + extraStrings.add(extraString); + } + } + if (!extraStrings.isEmpty()) { + extra = Joiner.on(UUID_DELIM).join(extraStrings); + } + } + final String uuid = UUID.randomUUID().toString(); + return extra == null ? uuid : (extra + UUID_DELIM + uuid); + } +} diff --git a/common/src/test/java/io/druid/common/utils/UUIDUtilsTest.java b/common/src/test/java/io/druid/common/utils/UUIDUtilsTest.java new file mode 100644 index 00000000000..06656de73b5 --- /dev/null +++ b/common/src/test/java/io/druid/common/utils/UUIDUtilsTest.java @@ -0,0 +1,104 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.common.utils; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +/** + * + */ +@RunWith(Parameterized.class) +public class UUIDUtilsTest +{ + @Parameterized.Parameters + public static Collection constructorFeeder() + { + final ArrayList args = new ArrayList<>(); + final List possibleArgs = Lists.newArrayList("one", "two", null, ""); + for (int i = 0; i < possibleArgs.size(); ++i) { + args.add(new String[]{possibleArgs.get(i)}); + for (int j = 0; j < possibleArgs.size(); ++j) { + for (int k = 0; k < possibleArgs.size(); ++k) { + args.add(new String[]{possibleArgs.get(i), possibleArgs.get(j), possibleArgs.get(k)}); + } + } + } + for(String possibleArg : possibleArgs){ + args.add(new String[]{possibleArg}); + } + return Collections2.transform( + args, new Function() + { + @Override + public Object[] apply(String[] input) + { + final ArrayList strings = new ArrayList<>(input.length); + for (String str : input) { + if (!Strings.isNullOrEmpty(str)) { + strings.add(str); + } + } + final String expected; + if (!strings.isEmpty()) { + expected = Joiner.on(UUIDUtils.UUID_DELIM).join(strings) + UUIDUtils.UUID_DELIM; + } else { + expected = ""; + } + return new Object[]{input, expected}; + } + } + ); + } + + private final String[] args; + private final String expectedBase; + + public UUIDUtilsTest(String[] args, String expectedBase) + { + this.args = args; + this.expectedBase = expectedBase; + } + + public static void validateIsStandardUUID( + String uuidString + ) + { + UUID uuid = UUID.fromString(uuidString); + Assert.assertEquals(uuid.toString(), uuidString); + } + + @Test + public void testUuid() + { + final String uuid = UUIDUtils.generateUuid(args); + validateIsStandardUUID(uuid.substring(expectedBase.length())); + } +} diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index 7b255cf7138..0ab6ab3c9cd 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; +import io.druid.common.utils.UUIDUtils; import io.druid.curator.announcement.Announcer; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.ZkPathsConfig; @@ -49,6 +50,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer private final Announcer announcer; private final ObjectMapper jsonMapper; private final String liveSegmentLocation; + private final DruidServerMetadata server; private final Object lock = new Object(); private final AtomicLong counter = new AtomicLong(0); @@ -69,6 +71,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer this.config = config; this.announcer = announcer; this.jsonMapper = jsonMapper; + this.server = server; this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName()); } @@ -84,7 +87,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer synchronized (lock) { // create new batch if (availableZNodes.isEmpty()) { - SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString())); + SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath()); availableZNode.addSegment(segment); log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath()); @@ -140,7 +143,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer @Override public void announceSegments(Iterable segments) throws IOException { - SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString())); + SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath()); Set batch = Sets.newHashSet(); int byteSize = 0; int count = 0; @@ -156,7 +159,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer segmentZNode.addSegments(batch); announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes()); - segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString())); + segmentZNode = new SegmentZNode(makeServedSegmentPath()); batch = Sets.newHashSet(); count = 0; byteSize = 0; @@ -181,6 +184,11 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer } } + private String makeServedSegmentPath(){ + // server.getName() is already in the zk path + return makeServedSegmentPath(UUIDUtils.generateUuid(server.getHost(), server.getType(), server.getTier(), new DateTime().toString())); + } + private String makeServedSegmentPath(String zNode) { return ZKPaths.makePath(liveSegmentLocation, String.format("%s%s", zNode, counter.getAndIncrement())); diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index 9dc65e84d00..aac61d552b6 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -18,11 +18,15 @@ package io.druid.client.client; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; import io.druid.client.BatchServerInventoryView; @@ -31,6 +35,7 @@ import io.druid.client.ServerView; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.announcement.Announcer; import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.Segment; import io.druid.server.coordination.BatchDataSegmentAnnouncer; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; @@ -54,9 +59,15 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -132,7 +143,7 @@ public class BatchServerInventoryViewTest ); segmentAnnouncer.start(); - testSegments = Sets.newHashSet(); + testSegments = Sets.newConcurrentHashSet(); for (int i = 0; i < INITIAL_SEGMENTS; i++) { testSegments.add(makeSegment(i)); } @@ -331,7 +342,7 @@ public class BatchServerInventoryViewTest waitForSync(filteredBatchServerInventoryView, testSegments); timing.forWaiting().awaitLatch(removeCallbackLatch); - + EasyMock.verify(callback); } @@ -349,7 +360,8 @@ public class BatchServerInventoryViewTest .build(); } - private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set testSegments) throws Exception + private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set testSegments) + throws Exception { final Timing forWaitingTiming = timing.forWaiting(); Stopwatch stopwatch = Stopwatch.createStarted(); @@ -361,4 +373,96 @@ public class BatchServerInventoryViewTest } } } + + @Test + public void testSameTimeZnode() throws Exception + { + final int numThreads = INITIAL_SEGMENTS / 10; + final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads)); + + segmentAnnouncer.announceSegments(testSegments); + + waitForSync(batchServerInventoryView, testSegments); + + DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0); + final Set segments = Sets.newHashSet(server.getSegments().values()); + + Assert.assertEquals(testSegments, segments); + + final CountDownLatch latch = new CountDownLatch(numThreads); + + final List> futures = new ArrayList<>(); + for (int i = 0; i < numThreads; ++i) { + final int ii = i; + futures.add( + executor.submit( + new Callable() + { + @Override + public BatchDataSegmentAnnouncer call() + { + BatchDataSegmentAnnouncer segmentAnnouncer = new BatchDataSegmentAnnouncer( + new DruidServerMetadata( + "id", + "host", + Long.MAX_VALUE, + "type", + "tier", + 0 + ), + new BatchDataSegmentAnnouncerConfig() + { + @Override + public int getSegmentsPerNode() + { + return 50; + } + }, + new ZkPathsConfig() + { + @Override + public String getBase() + { + return testBasePath; + } + }, + announcer, + jsonMapper + ); + segmentAnnouncer.start(); + List segments = new ArrayList(); + try { + for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) { + segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j)); + } + latch.countDown(); + latch.await(); + segmentAnnouncer.announceSegments(segments); + testSegments.addAll(segments); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return segmentAnnouncer; + } + } + ) + ); + } + final List announcers = Futures.allAsList(futures).get(); + Assert.assertEquals(INITIAL_SEGMENTS * 2, testSegments.size()); + waitForSync(batchServerInventoryView, testSegments); + + Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values())); + + for (int i = 0; i < INITIAL_SEGMENTS; ++i) { + final DataSegment segment = makeSegment(100 + i); + segmentAnnouncer.unannounceSegment(segment); + testSegments.remove(segment); + } + + waitForSync(batchServerInventoryView, testSegments); + + Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values())); + } }