Fix race in historical when loading segments in parallel (#7203)

* Fix race in historical when loading segments in parallel

* revert unnecessary change

* remove synchronized

* add reference counting locking

* fix build

* fix comment
This commit is contained in:
Jihoon Son 2019-03-08 17:54:05 -08:00 committed by GitHub
parent 62f0de9b89
commit 9bebf113ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 561 additions and 123 deletions

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@ -45,8 +44,10 @@ public class ReferenceCountingSegment extends AbstractSegment
@Override
protected boolean onAdvance(int phase, int registeredParties)
{
Preconditions.checkState(registeredParties == 0);
// Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen
if (registeredParties != 0) {
log.error("registeredParties[%s] is not 0", registeredParties);
}
try {
baseSegment.close();
}

View File

@ -25,6 +25,8 @@ import org.apache.druid.timeline.DataSegment;
import java.io.File;
/**
* Loading segments from deep storage to local storage.
* Implementations must be thread-safe.
*/
public interface SegmentLoader
{

View File

@ -20,10 +20,12 @@
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.Segment;
@ -32,15 +34,17 @@ import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
private static final Comparator<StorageLocation> COMPARATOR = (left, right) ->
Longs.compare(right.available(), left.available());
private final IndexIO indexIO;
private final SegmentLoaderConfig config;
@ -48,15 +52,30 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
private final List<StorageLocation> locations;
private final Object lock = new Object();
// This directoryWriteRemoveLock is used when creating or removing a directory
private final Object directoryWriteRemoveLock = new Object();
private static final Comparator<StorageLocation> COMPARATOR = new Comparator<StorageLocation>()
{
@Override public int compare(StorageLocation left, StorageLocation right)
{
return Longs.compare(right.available(), left.available());
}
};
/**
* A map between segment and referenceCountingLocks.
*
* These locks should be acquired whenever getting or deleting files for a segment.
* If different threads try to get or delete files simultaneously, one of them creates a lock first using
* {@link #createOrGetLock}. And then, all threads compete with each other to get the lock.
* Finally, the lock should be released using {@link #unlock}.
*
* An example usage is:
*
* final ReferenceCountingLock lock = createOrGetLock(segment);
* synchronized (lock) {
* try {
* doSomething();
* }
* finally {
* unlock(lock);
* }
* }
*/
private final ConcurrentHashMap<DataSegment, ReferenceCountingLock> segmentLocks = new ConcurrentHashMap<>();
// Note that we only create this via injection in historical and realtime nodes. Peons create these
// objects via SegmentLoaderFactory objects, so that they can store segments in task-specific
@ -74,12 +93,15 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
this.locations = new ArrayList<>();
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(new StorageLocation(
locations.add(
new StorageLocation(
locationConfig.getPath(),
locationConfig.getMaxSize(),
locationConfig.getFreeSpacePercent()
));
)
);
}
locations.sort(COMPARATOR);
}
@Override
@ -90,7 +112,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : getSortedList(locations)) {
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
if (localStorageDir.exists()) {
return location;
@ -102,7 +124,16 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
File segmentFiles = getSegmentFiles(segment);
final ReferenceCountingLock lock = createOrGetLock(segment);
final File segmentFiles;
synchronized (lock) {
try {
segmentFiles = getSegmentFiles(segment);
}
finally {
unlock(segment, lock);
}
}
File factoryJson = new File(segmentFiles, "factory.json");
final SegmentizerFactory factory;
@ -123,6 +154,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
StorageLocation loc = findStorageLocationIfLoaded(segment);
String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
@ -132,6 +166,11 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
loc.addSegment(segment);
return new File(loc.getPath(), storageDir);
}
finally {
unlock(segment, lock);
}
}
}
/**
* location may fail because of IO failure, most likely in two cases:<p>
@ -140,7 +179,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
*/
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
{
for (StorageLocation loc : getSortedList(locations)) {
for (StorageLocation loc : locations) {
if (loc.canHandle(segment)) {
File storageDir = new File(loc.getPath(), storageDirStr);
@ -169,7 +208,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
synchronized (lock) {
synchronized (directoryWriteRemoveLock) {
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
@ -212,17 +251,20 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
return;
}
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
StorageLocation loc = findStorageLocationIfLoaded(segment);
if (loc == null) {
log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
return;
}
// If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : getSortedList(locations)) {
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
if (localStorageDir.exists()) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
@ -232,6 +274,11 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
}
}
}
finally {
unlock(segment, lock);
}
}
}
private void cleanupCacheFiles(File baseFile, File cacheFile)
{
@ -239,13 +286,13 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
return;
}
synchronized (lock) {
synchronized (directoryWriteRemoveLock) {
log.info("Deleting directory[%s]", cacheFile);
try {
FileUtils.deleteDirectory(cacheFile);
}
catch (Exception e) {
log.error("Unable to remove file[%s]", cacheFile);
log.error(e, "Unable to remove directory[%s]", cacheFile);
}
}
@ -258,11 +305,62 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
}
}
private List<StorageLocation> getSortedList(List<StorageLocation> locs)
private ReferenceCountingLock createOrGetLock(DataSegment dataSegment)
{
List<StorageLocation> locations = new ArrayList<>(locs);
Collections.sort(locations, COMPARATOR);
return segmentLocks.compute(
dataSegment,
(segment, lock) -> {
final ReferenceCountingLock nonNullLock;
if (lock == null) {
nonNullLock = new ReferenceCountingLock();
} else {
nonNullLock = lock;
}
nonNullLock.increment();
return nonNullLock;
}
);
}
return locations;
private void unlock(DataSegment dataSegment, ReferenceCountingLock lock)
{
segmentLocks.compute(
dataSegment,
(segment, existingLock) -> {
//noinspection ObjectEquality
if (existingLock == null || existingLock != lock) {
throw new ISE("WTH? Different createOrGetLock instance");
} else {
if (existingLock.numReferences == 1) {
return null;
} else {
existingLock.decrement();
return existingLock;
}
}
}
);
}
@VisibleForTesting
private static class ReferenceCountingLock
{
private int numReferences;
private void increment()
{
++numReferences;
}
private void decrement()
{
--numReferences;
}
}
@VisibleForTesting
public ConcurrentHashMap<DataSegment, ReferenceCountingLock> getSegmentLocks()
{
return segmentLocks;
}
}

View File

@ -171,7 +171,7 @@ public class SegmentManager
);
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getId());
log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId());
resultSupplier.set(false);
} else {
loadedIntervals.add(
@ -223,6 +223,8 @@ public class SegmentManager
final PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
segment.getInterval(),
segment.getVersion(),
// remove() internally searches for a partitionChunk to remove which is *equal* to the given
// partitionChunk. Note that partitionChunk.equals() checks only the partitionNum, but not the object.
segment.getShardSpec().createChunk(null)
);
final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
@ -234,7 +236,7 @@ public class SegmentManager
oldQueryable.close();
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
"Told to delete a queryable on dataSource[%s] for interval[%s] and version[%s] that I don't have.",
dataSourceName,
segment.getInterval(),
segment.getVersion()

View File

@ -119,9 +119,14 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
return;
}
synchronized (lock) {
if (segmentLookup.containsKey(segment)) {
log.info("Skipping announcement of segment [%s]. Announcement exists already.", segment.getId());
return;
}
DataSegment toAnnounce = segmentTransformer.apply(segment);
synchronized (lock) {
changes.addChangeRequest(new SegmentChangeRequestLoad(toAnnounce));
if (config.isSkipSegmentAnnouncementOnZk()) {

View File

@ -106,7 +106,12 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
SegmentManager segmentManager
)
{
this(jsonMapper, config, announcer, serverAnnouncer, segmentManager,
this(
jsonMapper,
config,
announcer,
serverAnnouncer,
segmentManager,
Executors.newScheduledThreadPool(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
@ -250,7 +255,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
* Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will
* throw a SegmentLoadingException
*
* @throws SegmentLoadingException
* @throws SegmentLoadingException if it fails to load the given segment
*/
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
{
@ -305,6 +310,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
}
loadSegment(segment, DataSegmentChangeCallback.NOOP);
// announce segment even if the segment file already exists.
try {
announcer.announceSegment(segment);
}
@ -727,16 +733,10 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
(request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get()))
);
super.set(result);
set(result);
}
}
@Override
public boolean setException(Throwable throwable)
{
return super.setException(throwable);
}
@Override
public boolean cancel(boolean interruptIfRunning)
{

View File

@ -147,9 +147,9 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
return;
}
int batchSize = config.getHttpLoadQueuePeonBatchSize();
final int batchSize = config.getHttpLoadQueuePeonBatchSize();
List<DataSegmentChangeRequest> newRequests = new ArrayList<>(batchSize);
final List<DataSegmentChangeRequest> newRequests = new ArrayList<>(batchSize);
synchronized (lock) {
Iterator<Map.Entry<DataSegment, SegmentHolder>> iter = Iterators.concat(
@ -157,8 +157,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
segmentsToLoad.entrySet().iterator()
);
while (batchSize > 0 && iter.hasNext()) {
batchSize--;
while (newRequests.size() < batchSize && iter.hasNext()) {
Map.Entry<DataSegment, SegmentHolder> entry = iter.next();
if (entry.getValue().hasTimedOut()) {
entry.getValue().requestFailed("timed out");
@ -304,8 +303,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
return;
}
if (status.getState()
== SegmentLoadDropHandler.Status.STATE.FAILED) {
if (status.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) {
holder.requestFailed(status.getFailureCause());
} else {
holder.requestSucceeded();

View File

@ -37,7 +37,6 @@ import java.util.Set;
*/
public class CacheTestSegmentLoader implements SegmentLoader
{
private final Set<DataSegment> segmentsInTrash = new HashSet<>();
@Override

View File

@ -309,7 +309,8 @@ public class SegmentManagerTest
public void testLoadDuplicatedSegmentsInParallel()
throws ExecutionException, InterruptedException, SegmentLoadingException
{
final List<Future<Boolean>> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0)).stream()
final List<Future<Boolean>> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0))
.stream()
.map(
segment -> executor.submit(
() -> segmentManager.loadSegment(segment)
@ -347,7 +348,8 @@ public class SegmentManagerTest
throws SegmentLoadingException, ExecutionException, InterruptedException
{
segmentManager.loadSegment(segments.get(0));
final List<Future<Void>> futures = ImmutableList.of(segments.get(1), segments.get(2)).stream()
final List<Future<Void>> futures = ImmutableList.of(segments.get(1), segments.get(2))
.stream()
.map(
segment -> executor.submit(
() -> {

View File

@ -0,0 +1,268 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils.FileCopyResult;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.SegmentizerFactory;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SegmentManagerThreadSafetyTest
{
private static final int NUM_THREAD = 4;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private TestSegmentPuller segmentPuller;
private ObjectMapper objectMapper;
private IndexIO indexIO;
private File segmentCacheDir;
private File segmentDeepStorageDir;
private SegmentLoaderLocalCacheManager segmentLoader;
private SegmentManager segmentManager;
private ExecutorService exec;
@Before
public void setup() throws IOException
{
segmentPuller = new TestSegmentPuller();
objectMapper = new DefaultObjectMapper()
.registerModule(
new SimpleModule().registerSubtypes(new NamedType(LocalLoadSpec.class, "local"), new NamedType(TestSegmentizerFactory.class, "test"))
)
.setInjectableValues(new Std().addValue(LocalDataSegmentPuller.class, segmentPuller));
indexIO = new IndexIO(objectMapper, () -> 0);
segmentCacheDir = temporaryFolder.newFolder();
segmentDeepStorageDir = temporaryFolder.newFolder();
segmentLoader = new SegmentLoaderLocalCacheManager(
indexIO,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Collections.singletonList(
new StorageLocationConfig().setPath(segmentCacheDir)
);
}
},
objectMapper
);
segmentManager = new SegmentManager(segmentLoader);
exec = Execs.multiThreaded(NUM_THREAD, "SegmentManagerThreadSafetyTest-%d");
EmittingLogger.registerEmitter(new NoopServiceEmitter());
}
@After
public void teardown() throws IOException
{
exec.shutdownNow();
FileUtils.deleteDirectory(segmentCacheDir);
}
@Test(timeout = 5000L)
public void testLoadSameSegment() throws IOException, ExecutionException, InterruptedException
{
final DataSegment segment = createSegment("2019-01-01/2019-01-02");
final List<Future> futures = IntStream
.range(0, 16)
.mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment)))
.collect(Collectors.toList());
for (Future future : futures) {
future.get();
}
Assert.assertEquals(1, segmentPuller.numFileLoaded.size());
Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
}
@Test(timeout = 5000L)
public void testLoadMultipleSegments() throws IOException, ExecutionException, InterruptedException
{
final List<DataSegment> segments = new ArrayList<>(88);
for (int i = 0; i < 11; i++) {
for (int j = 0; j < 8; j++) {
segments.add(createSegment(StringUtils.format("2019-%02d-01/2019-%02d-01", i + 1, i + 2)));
}
}
final List<Future> futures = IntStream
.range(0, 16)
.mapToObj(i -> exec.submit(() -> {
for (DataSegment segment : segments) {
try {
segmentManager.loadSegment(segment);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
}))
.collect(Collectors.toList());
for (Future future : futures) {
future.get();
}
Assert.assertEquals(11, segmentPuller.numFileLoaded.size());
Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
}
private DataSegment createSegment(String interval) throws IOException
{
final DataSegment tmpSegment = new DataSegment(
"dataSource",
Intervals.of(interval),
"version",
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
new NumberedShardSpec(0, 0),
9,
100
);
final String storageDir = DataSegmentPusher.getDefaultStorageDir(tmpSegment, false);
final File segmentDir = new File(segmentDeepStorageDir, storageDir);
FileUtils.forceMkdir(segmentDir);
final File factoryJson = new File(segmentDir, "factory.json");
objectMapper.writeValue(factoryJson, new TestSegmentizerFactory());
return tmpSegment.withLoadSpec(
ImmutableMap.of("type", "local", "path", segmentDir.getAbsolutePath())
);
}
private static class TestSegmentPuller extends LocalDataSegmentPuller
{
private final Map<File, Integer> numFileLoaded = new HashMap<>();
@Override
public FileCopyResult getSegmentFiles(final File sourceFile, final File dir)
{
numFileLoaded.compute(sourceFile, (f, numLoaded) -> numLoaded == null ? 1 : numLoaded + 1);
try {
FileUtils.copyDirectory(sourceFile, dir);
}
catch (IOException e) {
throw new RuntimeException(e);
}
return new FileCopyResult()
{
@Override
public long size()
{
return 100L;
}
};
}
}
private static class TestSegmentizerFactory implements SegmentizerFactory
{
@Override
public Segment factorize(DataSegment segment, File parentDir)
{
return new Segment()
{
@Override
public SegmentId getId()
{
return segment.getId();
}
@Override
public Interval getDataInterval()
{
return segment.getInterval();
}
@Nullable
@Override
public QueryableIndex asQueryableIndex()
{
throw new UnsupportedOperationException();
}
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
}
@Override
public <T> T as(Class<T> clazz)
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{
}
};
}
}
}

View File

@ -65,15 +65,6 @@ public class SegmentLoadDropHandlerTest
private static final Logger log = new Logger(ZkCoordinatorTest.class);
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
private final DruidServerMetadata me = new DruidServerMetadata(
"dummyServer",
"dummyHost",
null,
0,
ServerType.HISTORICAL,
"normal",
0
);
private SegmentLoadDropHandler segmentLoadDropHandler;
private DataSegmentAnnouncer announcer;

View File

@ -51,10 +51,17 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -65,11 +72,12 @@ public class BatchDataSegmentAnnouncerTest
private static final String testBasePath = "/test";
private static final String testSegmentsPath = "/test/segments/id";
private static final Joiner joiner = Joiner.on("/");
private static final int NUM_THREADS = 4;
private TestingCluster testingCluster;
private CuratorFramework cf;
private ObjectMapper jsonMapper;
private Announcer announcer;
private TestAnnouncer announcer;
private SegmentReader segmentReader;
private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;
@ -78,6 +86,7 @@ public class BatchDataSegmentAnnouncerTest
private Boolean skipDimensionsAndMetrics;
private Boolean skipLoadSpec;
private ExecutorService exec;
@Before
public void setUp() throws Exception
@ -96,7 +105,7 @@ public class BatchDataSegmentAnnouncerTest
jsonMapper = TestHelper.makeJsonMapper();
announcer = new Announcer(
announcer = new TestAnnouncer(
cf,
Execs.directExecutor()
);
@ -157,6 +166,8 @@ public class BatchDataSegmentAnnouncerTest
for (int i = 0; i < 100; i++) {
testSegments.add(makeSegment(i));
}
exec = Execs.multiThreaded(NUM_THREADS, "BatchDataSegmentAnnouncerTest-%d");
}
@After
@ -165,6 +176,7 @@ public class BatchDataSegmentAnnouncerTest
announcer.stop();
cf.close();
testingCluster.stop();
exec.shutdownNow();
}
@Test
@ -299,6 +311,14 @@ public class BatchDataSegmentAnnouncerTest
testBatchAnnounce(true);
}
@Test
public void testMultipleBatchAnnounce() throws Exception
{
for (int i = 0; i < 10; i++) {
testBatchAnnounce(false);
}
}
private void testBatchAnnounce(boolean testHistory) throws Exception
{
segmentAnnouncer.announceSegments(testSegments);
@ -342,11 +362,72 @@ public class BatchDataSegmentAnnouncerTest
}
}
@Test
public void testMultipleBatchAnnounce() throws Exception
@Test(timeout = 5000L)
public void testAnnounceSegmentsWithSameSegmentConcurrently() throws ExecutionException, InterruptedException
{
for (int i = 0; i < 10; i++) {
testBatchAnnounce(false);
final List<Future> futures = new ArrayList<>(NUM_THREADS);
for (int i = 0; i < NUM_THREADS; i++) {
futures.add(
exec.submit(() -> {
try {
segmentAnnouncer.announceSegments(testSegments);
}
catch (IOException e) {
throw new RuntimeException(e);
}
})
);
}
for (Future future : futures) {
future.get();
}
// Announcing 100 segments requires 2 nodes because of maxBytesPerNode configuration.
Assert.assertEquals(2, announcer.numPathAnnounced.size());
for (ConcurrentHashMap<byte[], AtomicInteger> eachMap : announcer.numPathAnnounced.values()) {
for (Entry<byte[], AtomicInteger> entry : eachMap.entrySet()) {
Assert.assertEquals(1, entry.getValue().get());
}
}
}
@Test(timeout = 5000L)
public void testAnnounceSegmentWithSameSegmentConcurrently() throws ExecutionException, InterruptedException
{
final List<Future> futures = new ArrayList<>(NUM_THREADS);
final DataSegment segment1 = makeSegment(0);
final DataSegment segment2 = makeSegment(1);
final DataSegment segment3 = makeSegment(2);
final DataSegment segment4 = makeSegment(3);
for (int i = 0; i < NUM_THREADS; i++) {
futures.add(
exec.submit(() -> {
try {
segmentAnnouncer.announceSegment(segment1);
segmentAnnouncer.announceSegment(segment2);
segmentAnnouncer.announceSegment(segment3);
segmentAnnouncer.announceSegment(segment4);
}
catch (IOException e) {
throw new RuntimeException(e);
}
})
);
}
for (Future future : futures) {
future.get();
}
Assert.assertEquals(1, announcer.numPathAnnounced.size());
for (ConcurrentHashMap<byte[], AtomicInteger> eachMap : announcer.numPathAnnounced.values()) {
for (Entry<byte[], AtomicInteger> entry : eachMap.entrySet()) {
Assert.assertEquals(1, entry.getValue().get());
}
}
}
@ -396,4 +477,21 @@ public class BatchDataSegmentAnnouncerTest
return new HashSet<>();
}
}
private static class TestAnnouncer extends Announcer
{
private final ConcurrentHashMap<String, ConcurrentHashMap<byte[], AtomicInteger>> numPathAnnounced = new ConcurrentHashMap<>();
private TestAnnouncer(CuratorFramework curator, ExecutorService exec)
{
super(curator, exec);
}
@Override
public void announce(String path, byte[] bytes, boolean removeParentIfCreated)
{
numPathAnnounced.computeIfAbsent(path, k -> new ConcurrentHashMap<>()).computeIfAbsent(bytes, k -> new AtomicInteger(0)).incrementAndGet();
super.announce(path, bytes, removeParentIfCreated);
}
}
}

View File

@ -20,12 +20,9 @@
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -47,12 +44,10 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@ -172,29 +167,8 @@ public class HttpLoadQueuePeonTest
}
private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
{
Listener listener;
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
throw new UnsupportedOperationException("Not Implemented.");
}
@Override
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of());
listener.nodeViewInitialized();
this.listener = listener;
}
}
private static class TestHttpClient implements HttpClient
{
AtomicInteger requestNum = new AtomicInteger(0);
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request,