mirror of https://github.com/apache/druid.git
Change announcement ID to a UUID instead of Timestamp
* Also add a UUIDUtils to the common package
This commit is contained in:
parent
36b4c6a371
commit
79b1443cc3
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright 2012 - 2015 Metamarkets Group Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.druid.common.utils;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class UUIDUtils
|
||||
{
|
||||
public static final String UUID_DELIM = "-";
|
||||
|
||||
/**
|
||||
* Generates a universally unique identifier.
|
||||
*
|
||||
* @param extraData Extra data which often takes the form of debugging information
|
||||
*
|
||||
* @return A string which is a universally unique id (as determined by java.util.UUID) with extra data. It does not conform to a UUID variant standard.
|
||||
*/
|
||||
public static String generateUuid(String... extraData)
|
||||
{
|
||||
String extra = null;
|
||||
if (extraData != null && extraData.length > 0) {
|
||||
final ArrayList<String> extraStrings = new ArrayList<>(extraData.length);
|
||||
for (String extraString : extraData) {
|
||||
if (!Strings.isNullOrEmpty(extraString)) {
|
||||
extraStrings.add(extraString);
|
||||
}
|
||||
}
|
||||
if (!extraStrings.isEmpty()) {
|
||||
extra = Joiner.on(UUID_DELIM).join(extraStrings);
|
||||
}
|
||||
}
|
||||
final String uuid = UUID.randomUUID().toString();
|
||||
return extra == null ? uuid : (extra + UUID_DELIM + uuid);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright 2012 - 2015 Metamarkets Group Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.druid.common.utils;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class UUIDUtilsTest
|
||||
{
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> constructorFeeder()
|
||||
{
|
||||
final ArrayList<String[]> args = new ArrayList<>();
|
||||
final List<String> possibleArgs = Lists.newArrayList("one", "two", null, "");
|
||||
for (int i = 0; i < possibleArgs.size(); ++i) {
|
||||
args.add(new String[]{possibleArgs.get(i)});
|
||||
for (int j = 0; j < possibleArgs.size(); ++j) {
|
||||
for (int k = 0; k < possibleArgs.size(); ++k) {
|
||||
args.add(new String[]{possibleArgs.get(i), possibleArgs.get(j), possibleArgs.get(k)});
|
||||
}
|
||||
}
|
||||
}
|
||||
for(String possibleArg : possibleArgs){
|
||||
args.add(new String[]{possibleArg});
|
||||
}
|
||||
return Collections2.transform(
|
||||
args, new Function<String[], Object[]>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(String[] input)
|
||||
{
|
||||
final ArrayList<String> strings = new ArrayList<>(input.length);
|
||||
for (String str : input) {
|
||||
if (!Strings.isNullOrEmpty(str)) {
|
||||
strings.add(str);
|
||||
}
|
||||
}
|
||||
final String expected;
|
||||
if (!strings.isEmpty()) {
|
||||
expected = Joiner.on(UUIDUtils.UUID_DELIM).join(strings) + UUIDUtils.UUID_DELIM;
|
||||
} else {
|
||||
expected = "";
|
||||
}
|
||||
return new Object[]{input, expected};
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private final String[] args;
|
||||
private final String expectedBase;
|
||||
|
||||
public UUIDUtilsTest(String[] args, String expectedBase)
|
||||
{
|
||||
this.args = args;
|
||||
this.expectedBase = expectedBase;
|
||||
}
|
||||
|
||||
public static void validateIsStandardUUID(
|
||||
String uuidString
|
||||
)
|
||||
{
|
||||
UUID uuid = UUID.fromString(uuidString);
|
||||
Assert.assertEquals(uuid.toString(), uuidString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUuid()
|
||||
{
|
||||
final String uuid = UUIDUtils.generateUuid(args);
|
||||
validateIsStandardUUID(uuid.substring(expectedBase.length()));
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
|
|||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.common.utils.UUIDUtils;
|
||||
import io.druid.curator.announcement.Announcer;
|
||||
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
|
@ -49,6 +50,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
private final Announcer announcer;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final String liveSegmentLocation;
|
||||
private final DruidServerMetadata server;
|
||||
|
||||
private final Object lock = new Object();
|
||||
private final AtomicLong counter = new AtomicLong(0);
|
||||
|
@ -69,6 +71,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
this.config = config;
|
||||
this.announcer = announcer;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.server = server;
|
||||
|
||||
this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName());
|
||||
}
|
||||
|
@ -84,7 +87,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
synchronized (lock) {
|
||||
// create new batch
|
||||
if (availableZNodes.isEmpty()) {
|
||||
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
|
||||
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
|
||||
availableZNode.addSegment(segment);
|
||||
|
||||
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
|
||||
|
@ -140,7 +143,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
@Override
|
||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
|
||||
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath());
|
||||
Set<DataSegment> batch = Sets.newHashSet();
|
||||
int byteSize = 0;
|
||||
int count = 0;
|
||||
|
@ -156,7 +159,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
segmentZNode.addSegments(batch);
|
||||
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
|
||||
|
||||
segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
|
||||
segmentZNode = new SegmentZNode(makeServedSegmentPath());
|
||||
batch = Sets.newHashSet();
|
||||
count = 0;
|
||||
byteSize = 0;
|
||||
|
@ -181,6 +184,11 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
}
|
||||
}
|
||||
|
||||
private String makeServedSegmentPath(){
|
||||
// server.getName() is already in the zk path
|
||||
return makeServedSegmentPath(UUIDUtils.generateUuid(server.getHost(), server.getType(), server.getTier(), new DateTime().toString()));
|
||||
}
|
||||
|
||||
private String makeServedSegmentPath(String zNode)
|
||||
{
|
||||
return ZKPaths.makePath(liveSegmentLocation, String.format("%s%s", zNode, counter.getAndIncrement()));
|
||||
|
|
|
@ -18,11 +18,15 @@
|
|||
package io.druid.client.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.ISE;
|
||||
import io.druid.client.BatchServerInventoryView;
|
||||
|
@ -31,6 +35,7 @@ import io.druid.client.ServerView;
|
|||
import io.druid.curator.PotentiallyGzippedCompressionProvider;
|
||||
import io.druid.curator.announcement.Announcer;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
|
||||
|
@ -54,9 +59,15 @@ import org.junit.Test;
|
|||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
|
@ -132,7 +143,7 @@ public class BatchServerInventoryViewTest
|
|||
);
|
||||
segmentAnnouncer.start();
|
||||
|
||||
testSegments = Sets.newHashSet();
|
||||
testSegments = Sets.newConcurrentHashSet();
|
||||
for (int i = 0; i < INITIAL_SEGMENTS; i++) {
|
||||
testSegments.add(makeSegment(i));
|
||||
}
|
||||
|
@ -331,7 +342,7 @@ public class BatchServerInventoryViewTest
|
|||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
timing.forWaiting().awaitLatch(removeCallbackLatch);
|
||||
|
||||
|
||||
EasyMock.verify(callback);
|
||||
}
|
||||
|
||||
|
@ -349,7 +360,8 @@ public class BatchServerInventoryViewTest
|
|||
.build();
|
||||
}
|
||||
|
||||
private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments) throws Exception
|
||||
private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments)
|
||||
throws Exception
|
||||
{
|
||||
final Timing forWaitingTiming = timing.forWaiting();
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
|
@ -361,4 +373,96 @@ public class BatchServerInventoryViewTest
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSameTimeZnode() throws Exception
|
||||
{
|
||||
final int numThreads = INITIAL_SEGMENTS / 10;
|
||||
final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads));
|
||||
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
waitForSync(batchServerInventoryView, testSegments);
|
||||
|
||||
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
|
||||
final Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
||||
Assert.assertEquals(testSegments, segments);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(numThreads);
|
||||
|
||||
final List<ListenableFuture<BatchDataSegmentAnnouncer>> futures = new ArrayList<>();
|
||||
for (int i = 0; i < numThreads; ++i) {
|
||||
final int ii = i;
|
||||
futures.add(
|
||||
executor.submit(
|
||||
new Callable<BatchDataSegmentAnnouncer>()
|
||||
{
|
||||
@Override
|
||||
public BatchDataSegmentAnnouncer call()
|
||||
{
|
||||
BatchDataSegmentAnnouncer segmentAnnouncer = new BatchDataSegmentAnnouncer(
|
||||
new DruidServerMetadata(
|
||||
"id",
|
||||
"host",
|
||||
Long.MAX_VALUE,
|
||||
"type",
|
||||
"tier",
|
||||
0
|
||||
),
|
||||
new BatchDataSegmentAnnouncerConfig()
|
||||
{
|
||||
@Override
|
||||
public int getSegmentsPerNode()
|
||||
{
|
||||
return 50;
|
||||
}
|
||||
},
|
||||
new ZkPathsConfig()
|
||||
{
|
||||
@Override
|
||||
public String getBase()
|
||||
{
|
||||
return testBasePath;
|
||||
}
|
||||
},
|
||||
announcer,
|
||||
jsonMapper
|
||||
);
|
||||
segmentAnnouncer.start();
|
||||
List<DataSegment> segments = new ArrayList<DataSegment>();
|
||||
try {
|
||||
for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) {
|
||||
segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j));
|
||||
}
|
||||
latch.countDown();
|
||||
latch.await();
|
||||
segmentAnnouncer.announceSegments(segments);
|
||||
testSegments.addAll(segments);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return segmentAnnouncer;
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
final List<BatchDataSegmentAnnouncer> announcers = Futures.<BatchDataSegmentAnnouncer>allAsList(futures).get();
|
||||
Assert.assertEquals(INITIAL_SEGMENTS * 2, testSegments.size());
|
||||
waitForSync(batchServerInventoryView, testSegments);
|
||||
|
||||
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
|
||||
|
||||
for (int i = 0; i < INITIAL_SEGMENTS; ++i) {
|
||||
final DataSegment segment = makeSegment(100 + i);
|
||||
segmentAnnouncer.unannounceSegment(segment);
|
||||
testSegments.remove(segment);
|
||||
}
|
||||
|
||||
waitForSync(batchServerInventoryView, testSegments);
|
||||
|
||||
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue