Merge pull request #1221 from metamx/AnnounceIdUUID

Change announcement ID to a UUID instead of Timestamp
This commit is contained in:
Fangjin Yang 2015-03-17 15:38:53 -07:00
commit 8d0dfd9af1
4 changed files with 279 additions and 6 deletions

View File

@ -0,0 +1,57 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.common.utils;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.UUID;
/**
*
*/
public class UUIDUtils
{
public static final String UUID_DELIM = "-";
/**
* Generates a universally unique identifier.
*
* @param extraData Extra data which often takes the form of debugging information
*
* @return A string which is a universally unique id (as determined by java.util.UUID) with extra data. It does not conform to a UUID variant standard.
*/
public static String generateUuid(String... extraData)
{
String extra = null;
if (extraData != null && extraData.length > 0) {
final ArrayList<String> extraStrings = new ArrayList<>(extraData.length);
for (String extraString : extraData) {
if (!Strings.isNullOrEmpty(extraString)) {
extraStrings.add(extraString);
}
}
if (!extraStrings.isEmpty()) {
extra = Joiner.on(UUID_DELIM).join(extraStrings);
}
}
final String uuid = UUID.randomUUID().toString();
return extra == null ? uuid : (extra + UUID_DELIM + uuid);
}
}

View File

@ -0,0 +1,104 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.common.utils;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
/**
*
*/
@RunWith(Parameterized.class)
public class UUIDUtilsTest
{
@Parameterized.Parameters
public static Collection<Object[]> constructorFeeder()
{
final ArrayList<String[]> args = new ArrayList<>();
final List<String> possibleArgs = Lists.newArrayList("one", "two", null, "");
for (int i = 0; i < possibleArgs.size(); ++i) {
args.add(new String[]{possibleArgs.get(i)});
for (int j = 0; j < possibleArgs.size(); ++j) {
for (int k = 0; k < possibleArgs.size(); ++k) {
args.add(new String[]{possibleArgs.get(i), possibleArgs.get(j), possibleArgs.get(k)});
}
}
}
for(String possibleArg : possibleArgs){
args.add(new String[]{possibleArg});
}
return Collections2.transform(
args, new Function<String[], Object[]>()
{
@Override
public Object[] apply(String[] input)
{
final ArrayList<String> strings = new ArrayList<>(input.length);
for (String str : input) {
if (!Strings.isNullOrEmpty(str)) {
strings.add(str);
}
}
final String expected;
if (!strings.isEmpty()) {
expected = Joiner.on(UUIDUtils.UUID_DELIM).join(strings) + UUIDUtils.UUID_DELIM;
} else {
expected = "";
}
return new Object[]{input, expected};
}
}
);
}
private final String[] args;
private final String expectedBase;
public UUIDUtilsTest(String[] args, String expectedBase)
{
this.args = args;
this.expectedBase = expectedBase;
}
public static void validateIsStandardUUID(
String uuidString
)
{
UUID uuid = UUID.fromString(uuidString);
Assert.assertEquals(uuid.toString(), uuidString);
}
@Test
public void testUuid()
{
final String uuid = UUIDUtils.generateUuid(args);
validateIsStandardUUID(uuid.substring(expectedBase.length()));
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.UUIDUtils;
import io.druid.curator.announcement.Announcer;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.ZkPathsConfig;
@ -49,6 +50,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final String liveSegmentLocation;
private final DruidServerMetadata server;
private final Object lock = new Object();
private final AtomicLong counter = new AtomicLong(0);
@ -69,6 +71,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
this.config = config;
this.announcer = announcer;
this.jsonMapper = jsonMapper;
this.server = server;
this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName());
}
@ -84,7 +87,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
synchronized (lock) {
// create new batch
if (availableZNodes.isEmpty()) {
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
@ -140,7 +143,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath());
Set<DataSegment> batch = Sets.newHashSet();
int byteSize = 0;
int count = 0;
@ -156,7 +159,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
segmentZNode.addSegments(batch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
segmentZNode = new SegmentZNode(makeServedSegmentPath());
batch = Sets.newHashSet();
count = 0;
byteSize = 0;
@ -181,6 +184,11 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
}
}
private String makeServedSegmentPath(){
// server.getName() is already in the zk path
return makeServedSegmentPath(UUIDUtils.generateUuid(server.getHost(), server.getType(), server.getTier(), new DateTime().toString()));
}
private String makeServedSegmentPath(String zNode)
{
return ZKPaths.makePath(liveSegmentLocation, String.format("%s%s", zNode, counter.getAndIncrement()));

View File

@ -18,11 +18,15 @@
package io.druid.client.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import io.druid.client.BatchServerInventoryView;
@ -31,6 +35,7 @@ import io.druid.client.ServerView;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.Segment;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
@ -54,9 +59,15 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
@ -132,7 +143,7 @@ public class BatchServerInventoryViewTest
);
segmentAnnouncer.start();
testSegments = Sets.newHashSet();
testSegments = Sets.newConcurrentHashSet();
for (int i = 0; i < INITIAL_SEGMENTS; i++) {
testSegments.add(makeSegment(i));
}
@ -331,7 +342,7 @@ public class BatchServerInventoryViewTest
waitForSync(filteredBatchServerInventoryView, testSegments);
timing.forWaiting().awaitLatch(removeCallbackLatch);
EasyMock.verify(callback);
}
@ -349,7 +360,8 @@ public class BatchServerInventoryViewTest
.build();
}
private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments) throws Exception
private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments)
throws Exception
{
final Timing forWaitingTiming = timing.forWaiting();
Stopwatch stopwatch = Stopwatch.createStarted();
@ -361,4 +373,96 @@ public class BatchServerInventoryViewTest
}
}
}
@Test
public void testSameTimeZnode() throws Exception
{
final int numThreads = INITIAL_SEGMENTS / 10;
final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads));
segmentAnnouncer.announceSegments(testSegments);
waitForSync(batchServerInventoryView, testSegments);
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
final Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
Assert.assertEquals(testSegments, segments);
final CountDownLatch latch = new CountDownLatch(numThreads);
final List<ListenableFuture<BatchDataSegmentAnnouncer>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; ++i) {
final int ii = i;
futures.add(
executor.submit(
new Callable<BatchDataSegmentAnnouncer>()
{
@Override
public BatchDataSegmentAnnouncer call()
{
BatchDataSegmentAnnouncer segmentAnnouncer = new BatchDataSegmentAnnouncer(
new DruidServerMetadata(
"id",
"host",
Long.MAX_VALUE,
"type",
"tier",
0
),
new BatchDataSegmentAnnouncerConfig()
{
@Override
public int getSegmentsPerNode()
{
return 50;
}
},
new ZkPathsConfig()
{
@Override
public String getBase()
{
return testBasePath;
}
},
announcer,
jsonMapper
);
segmentAnnouncer.start();
List<DataSegment> segments = new ArrayList<DataSegment>();
try {
for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) {
segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j));
}
latch.countDown();
latch.await();
segmentAnnouncer.announceSegments(segments);
testSegments.addAll(segments);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return segmentAnnouncer;
}
}
)
);
}
final List<BatchDataSegmentAnnouncer> announcers = Futures.<BatchDataSegmentAnnouncer>allAsList(futures).get();
Assert.assertEquals(INITIAL_SEGMENTS * 2, testSegments.size());
waitForSync(batchServerInventoryView, testSegments);
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
for (int i = 0; i < INITIAL_SEGMENTS; ++i) {
final DataSegment segment = makeSegment(100 + i);
segmentAnnouncer.unannounceSegment(segment);
testSegments.remove(segment);
}
waitForSync(batchServerInventoryView, testSegments);
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
}
}