diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java index 4f27a4f3394..ef98b7515b2 100644 --- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java @@ -19,7 +19,6 @@ package org.apache.druid.segment; -import com.google.common.base.Preconditions; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; @@ -45,8 +44,10 @@ public class ReferenceCountingSegment extends AbstractSegment @Override protected boolean onAdvance(int phase, int registeredParties) { - Preconditions.checkState(registeredParties == 0); // Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen + if (registeredParties != 0) { + log.error("registeredParties[%s] is not 0", registeredParties); + } try { baseSegment.close(); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java index 3db4672f3fb..301e7370229 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java @@ -25,6 +25,8 @@ import org.apache.druid.timeline.DataSegment; import java.io.File; /** + * Loading segments from deep storage to local storage. + * Implementations must be thread-safe. */ public interface SegmentLoader { diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 92987354a74..48049b0611e 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -20,10 +20,12 @@ package org.apache.druid.segment.loading; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Longs; import com.google.inject.Inject; import org.apache.commons.io.FileUtils; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Segment; @@ -32,15 +34,17 @@ import org.apache.druid.timeline.DataSegment; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; /** */ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); + private static final Comparator COMPARATOR = (left, right) -> + Longs.compare(right.available(), left.available()); private final IndexIO indexIO; private final SegmentLoaderConfig config; @@ -48,15 +52,30 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader private final List locations; - private final Object lock = new Object(); + // This directoryWriteRemoveLock is used when creating or removing a directory + private final Object directoryWriteRemoveLock = new Object(); - private static final Comparator COMPARATOR = new Comparator() - { - @Override public int compare(StorageLocation left, StorageLocation right) - { - return Longs.compare(right.available(), left.available()); - } - }; + /** + * A map between segment and referenceCountingLocks. + * + * These locks should be acquired whenever getting or deleting files for a segment. + * If different threads try to get or delete files simultaneously, one of them creates a lock first using + * {@link #createOrGetLock}. And then, all threads compete with each other to get the lock. + * Finally, the lock should be released using {@link #unlock}. + * + * An example usage is: + * + * final ReferenceCountingLock lock = createOrGetLock(segment); + * synchronized (lock) { + * try { + * doSomething(); + * } + * finally { + * unlock(lock); + * } + * } + */ + private final ConcurrentHashMap segmentLocks = new ConcurrentHashMap<>(); // Note that we only create this via injection in historical and realtime nodes. Peons create these // objects via SegmentLoaderFactory objects, so that they can store segments in task-specific @@ -74,12 +93,15 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader this.locations = new ArrayList<>(); for (StorageLocationConfig locationConfig : config.getLocations()) { - locations.add(new StorageLocation( - locationConfig.getPath(), - locationConfig.getMaxSize(), - locationConfig.getFreeSpacePercent() - )); + locations.add( + new StorageLocation( + locationConfig.getPath(), + locationConfig.getMaxSize(), + locationConfig.getFreeSpacePercent() + ) + ); } + locations.sort(COMPARATOR); } @Override @@ -90,7 +112,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) { - for (StorageLocation location : getSortedList(locations)) { + for (StorageLocation location : locations) { File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); if (localStorageDir.exists()) { return location; @@ -102,7 +124,16 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader @Override public Segment getSegment(DataSegment segment) throws SegmentLoadingException { - File segmentFiles = getSegmentFiles(segment); + final ReferenceCountingLock lock = createOrGetLock(segment); + final File segmentFiles; + synchronized (lock) { + try { + segmentFiles = getSegmentFiles(segment); + } + finally { + unlock(segment, lock); + } + } File factoryJson = new File(segmentFiles, "factory.json"); final SegmentizerFactory factory; @@ -123,14 +154,22 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader @Override public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { - StorageLocation loc = findStorageLocationIfLoaded(segment); - String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); + final ReferenceCountingLock lock = createOrGetLock(segment); + synchronized (lock) { + try { + StorageLocation loc = findStorageLocationIfLoaded(segment); + String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); - if (loc == null) { - loc = loadSegmentWithRetry(segment, storageDir); + if (loc == null) { + loc = loadSegmentWithRetry(segment, storageDir); + } + loc.addSegment(segment); + return new File(loc.getPath(), storageDir); + } + finally { + unlock(segment, lock); + } } - loc.addSegment(segment); - return new File(loc.getPath(), storageDir); } /** @@ -140,7 +179,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader */ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { - for (StorageLocation loc : getSortedList(locations)) { + for (StorageLocation loc : locations) { if (loc.canHandle(segment)) { File storageDir = new File(loc.getPath(), storageDirStr); @@ -169,7 +208,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader // We use a marker to prevent the case where a segment is downloaded, but before the download completes, // the parent directories of the segment are removed final File downloadStartMarker = new File(storageDir, "downloadStartMarker"); - synchronized (lock) { + synchronized (directoryWriteRemoveLock) { if (!storageDir.mkdirs()) { log.debug("Unable to make parent file[%s]", storageDir); } @@ -212,23 +251,31 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader return; } - StorageLocation loc = findStorageLocationIfLoaded(segment); + final ReferenceCountingLock lock = createOrGetLock(segment); + synchronized (lock) { + try { + StorageLocation loc = findStorageLocationIfLoaded(segment); - if (loc == null) { - log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment); - return; - } + if (loc == null) { + log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment); + return; + } - // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed, - // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not. - // So we should always clean all possible locations here - for (StorageLocation location : getSortedList(locations)) { - File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); - if (localStorageDir.exists()) { - // Druid creates folders of the form dataSource/interval/version/partitionNum. - // We need to clean up all these directories if they are all empty. - cleanupCacheFiles(location.getPath(), localStorageDir); - location.removeSegment(segment); + // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed, + // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not. + // So we should always clean all possible locations here + for (StorageLocation location : locations) { + File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); + if (localStorageDir.exists()) { + // Druid creates folders of the form dataSource/interval/version/partitionNum. + // We need to clean up all these directories if they are all empty. + cleanupCacheFiles(location.getPath(), localStorageDir); + location.removeSegment(segment); + } + } + } + finally { + unlock(segment, lock); } } } @@ -239,13 +286,13 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader return; } - synchronized (lock) { + synchronized (directoryWriteRemoveLock) { log.info("Deleting directory[%s]", cacheFile); try { FileUtils.deleteDirectory(cacheFile); } catch (Exception e) { - log.error("Unable to remove file[%s]", cacheFile); + log.error(e, "Unable to remove directory[%s]", cacheFile); } } @@ -258,11 +305,62 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader } } - private List getSortedList(List locs) + private ReferenceCountingLock createOrGetLock(DataSegment dataSegment) { - List locations = new ArrayList<>(locs); - Collections.sort(locations, COMPARATOR); + return segmentLocks.compute( + dataSegment, + (segment, lock) -> { + final ReferenceCountingLock nonNullLock; + if (lock == null) { + nonNullLock = new ReferenceCountingLock(); + } else { + nonNullLock = lock; + } + nonNullLock.increment(); + return nonNullLock; + } + ); + } - return locations; + private void unlock(DataSegment dataSegment, ReferenceCountingLock lock) + { + segmentLocks.compute( + dataSegment, + (segment, existingLock) -> { + //noinspection ObjectEquality + if (existingLock == null || existingLock != lock) { + throw new ISE("WTH? Different createOrGetLock instance"); + } else { + if (existingLock.numReferences == 1) { + return null; + } else { + existingLock.decrement(); + return existingLock; + } + } + } + ); + } + + @VisibleForTesting + private static class ReferenceCountingLock + { + private int numReferences; + + private void increment() + { + ++numReferences; + } + + private void decrement() + { + --numReferences; + } + } + + @VisibleForTesting + public ConcurrentHashMap getSegmentLocks() + { + return segmentLocks; } } diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index a1975ffb7a8..ecbb0d49686 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -171,7 +171,7 @@ public class SegmentManager ); if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { - log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getId()); + log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId()); resultSupplier.set(false); } else { loadedIntervals.add( @@ -223,6 +223,8 @@ public class SegmentManager final PartitionChunk removed = loadedIntervals.remove( segment.getInterval(), segment.getVersion(), + // remove() internally searches for a partitionChunk to remove which is *equal* to the given + // partitionChunk. Note that partitionChunk.equals() checks only the partitionNum, but not the object. segment.getShardSpec().createChunk(null) ); final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject(); @@ -234,7 +236,7 @@ public class SegmentManager oldQueryable.close(); } else { log.info( - "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.", + "Told to delete a queryable on dataSource[%s] for interval[%s] and version[%s] that I don't have.", dataSourceName, segment.getInterval(), segment.getVersion() diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java index d45c51cced6..205bbac1d0b 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -119,9 +119,14 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer return; } - DataSegment toAnnounce = segmentTransformer.apply(segment); - synchronized (lock) { + if (segmentLookup.containsKey(segment)) { + log.info("Skipping announcement of segment [%s]. Announcement exists already.", segment.getId()); + return; + } + + DataSegment toAnnounce = segmentTransformer.apply(segment); + changes.addChangeRequest(new SegmentChangeRequestLoad(toAnnounce)); if (config.isSkipSegmentAnnouncementOnZk()) { diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 53b12c9aee5..36fbe5638e7 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -106,11 +106,16 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler SegmentManager segmentManager ) { - this(jsonMapper, config, announcer, serverAnnouncer, segmentManager, - Executors.newScheduledThreadPool( - config.getNumLoadingThreads(), - Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s") - ) + this( + jsonMapper, + config, + announcer, + serverAnnouncer, + segmentManager, + Executors.newScheduledThreadPool( + config.getNumLoadingThreads(), + Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s") + ) ); } @@ -250,7 +255,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler * Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will * throw a SegmentLoadingException * - * @throws SegmentLoadingException + * @throws SegmentLoadingException if it fails to load the given segment */ private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException { @@ -305,6 +310,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler } } loadSegment(segment, DataSegmentChangeCallback.NOOP); + // announce segment even if the segment file already exists. try { announcer.announceSegment(segment); } @@ -727,16 +733,10 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler (request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get())) ); - super.set(result); + set(result); } } - @Override - public boolean setException(Throwable throwable) - { - return super.setException(throwable); - } - @Override public boolean cancel(boolean interruptIfRunning) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java index e3ab60412f4..0c0ee56e072 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java @@ -147,9 +147,9 @@ public class HttpLoadQueuePeon extends LoadQueuePeon return; } - int batchSize = config.getHttpLoadQueuePeonBatchSize(); + final int batchSize = config.getHttpLoadQueuePeonBatchSize(); - List newRequests = new ArrayList<>(batchSize); + final List newRequests = new ArrayList<>(batchSize); synchronized (lock) { Iterator> iter = Iterators.concat( @@ -157,8 +157,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon segmentsToLoad.entrySet().iterator() ); - while (batchSize > 0 && iter.hasNext()) { - batchSize--; + while (newRequests.size() < batchSize && iter.hasNext()) { Map.Entry entry = iter.next(); if (entry.getValue().hasTimedOut()) { entry.getValue().requestFailed("timed out"); @@ -304,8 +303,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon return; } - if (status.getState() - == SegmentLoadDropHandler.Status.STATE.FAILED) { + if (status.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) { holder.requestFailed(status.getFailureCause()); } else { holder.requestSucceeded(); diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index 5723359b77a..0b291d40210 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -37,7 +37,6 @@ import java.util.Set; */ public class CacheTestSegmentLoader implements SegmentLoader { - private final Set segmentsInTrash = new HashSet<>(); @Override diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 826f249a7b0..14cbdbd90a4 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -309,7 +309,8 @@ public class SegmentManagerTest public void testLoadDuplicatedSegmentsInParallel() throws ExecutionException, InterruptedException, SegmentLoadingException { - final List> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0)).stream() + final List> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0)) + .stream() .map( segment -> executor.submit( () -> segmentManager.loadSegment(segment) @@ -347,16 +348,17 @@ public class SegmentManagerTest throws SegmentLoadingException, ExecutionException, InterruptedException { segmentManager.loadSegment(segments.get(0)); - final List> futures = ImmutableList.of(segments.get(1), segments.get(2)).stream() - .map( - segment -> executor.submit( - () -> { - segmentManager.dropSegment(segment); - return (Void) null; - } - ) - ) - .collect(Collectors.toList()); + final List> futures = ImmutableList.of(segments.get(1), segments.get(2)) + .stream() + .map( + segment -> executor.submit( + () -> { + segmentManager.dropSegment(segment); + return (Void) null; + } + ) + ) + .collect(Collectors.toList()); for (Future future : futures) { future.get(); diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java new file mode 100644 index 00000000000..ab78689751e --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.databind.InjectableValues.Std; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.io.FileUtils; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.FileUtils.FileCopyResult; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.LocalDataSegmentPuller; +import org.apache.druid.segment.loading.LocalLoadSpec; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.segment.loading.SegmentizerFactory; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class SegmentManagerThreadSafetyTest +{ + private static final int NUM_THREAD = 4; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private TestSegmentPuller segmentPuller; + private ObjectMapper objectMapper; + private IndexIO indexIO; + private File segmentCacheDir; + private File segmentDeepStorageDir; + private SegmentLoaderLocalCacheManager segmentLoader; + private SegmentManager segmentManager; + private ExecutorService exec; + + @Before + public void setup() throws IOException + { + segmentPuller = new TestSegmentPuller(); + objectMapper = new DefaultObjectMapper() + .registerModule( + new SimpleModule().registerSubtypes(new NamedType(LocalLoadSpec.class, "local"), new NamedType(TestSegmentizerFactory.class, "test")) + ) + .setInjectableValues(new Std().addValue(LocalDataSegmentPuller.class, segmentPuller)); + indexIO = new IndexIO(objectMapper, () -> 0); + segmentCacheDir = temporaryFolder.newFolder(); + segmentDeepStorageDir = temporaryFolder.newFolder(); + segmentLoader = new SegmentLoaderLocalCacheManager( + indexIO, + new SegmentLoaderConfig() + { + @Override + public List getLocations() + { + return Collections.singletonList( + new StorageLocationConfig().setPath(segmentCacheDir) + ); + } + }, + objectMapper + ); + segmentManager = new SegmentManager(segmentLoader); + exec = Execs.multiThreaded(NUM_THREAD, "SegmentManagerThreadSafetyTest-%d"); + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + } + + @After + public void teardown() throws IOException + { + exec.shutdownNow(); + FileUtils.deleteDirectory(segmentCacheDir); + } + + @Test(timeout = 5000L) + public void testLoadSameSegment() throws IOException, ExecutionException, InterruptedException + { + final DataSegment segment = createSegment("2019-01-01/2019-01-02"); + final List futures = IntStream + .range(0, 16) + .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment))) + .collect(Collectors.toList()); + for (Future future : futures) { + future.get(); + } + Assert.assertEquals(1, segmentPuller.numFileLoaded.size()); + Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue()); + Assert.assertEquals(0, segmentLoader.getSegmentLocks().size()); + } + + @Test(timeout = 5000L) + public void testLoadMultipleSegments() throws IOException, ExecutionException, InterruptedException + { + final List segments = new ArrayList<>(88); + for (int i = 0; i < 11; i++) { + for (int j = 0; j < 8; j++) { + segments.add(createSegment(StringUtils.format("2019-%02d-01/2019-%02d-01", i + 1, i + 2))); + } + } + + final List futures = IntStream + .range(0, 16) + .mapToObj(i -> exec.submit(() -> { + for (DataSegment segment : segments) { + try { + segmentManager.loadSegment(segment); + } + catch (SegmentLoadingException e) { + throw new RuntimeException(e); + } + } + })) + .collect(Collectors.toList()); + for (Future future : futures) { + future.get(); + } + Assert.assertEquals(11, segmentPuller.numFileLoaded.size()); + Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue()); + Assert.assertEquals(0, segmentLoader.getSegmentLocks().size()); + } + + private DataSegment createSegment(String interval) throws IOException + { + final DataSegment tmpSegment = new DataSegment( + "dataSource", + Intervals.of(interval), + "version", + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + new NumberedShardSpec(0, 0), + 9, + 100 + ); + final String storageDir = DataSegmentPusher.getDefaultStorageDir(tmpSegment, false); + final File segmentDir = new File(segmentDeepStorageDir, storageDir); + FileUtils.forceMkdir(segmentDir); + + final File factoryJson = new File(segmentDir, "factory.json"); + objectMapper.writeValue(factoryJson, new TestSegmentizerFactory()); + return tmpSegment.withLoadSpec( + ImmutableMap.of("type", "local", "path", segmentDir.getAbsolutePath()) + ); + } + + private static class TestSegmentPuller extends LocalDataSegmentPuller + { + private final Map numFileLoaded = new HashMap<>(); + + @Override + public FileCopyResult getSegmentFiles(final File sourceFile, final File dir) + { + numFileLoaded.compute(sourceFile, (f, numLoaded) -> numLoaded == null ? 1 : numLoaded + 1); + try { + FileUtils.copyDirectory(sourceFile, dir); + } + catch (IOException e) { + throw new RuntimeException(e); + } + return new FileCopyResult() + { + @Override + public long size() + { + return 100L; + } + }; + } + } + + private static class TestSegmentizerFactory implements SegmentizerFactory + { + @Override + public Segment factorize(DataSegment segment, File parentDir) + { + return new Segment() + { + @Override + public SegmentId getId() + { + return segment.getId(); + } + + @Override + public Interval getDataInterval() + { + return segment.getInterval(); + } + + @Nullable + @Override + public QueryableIndex asQueryableIndex() + { + throw new UnsupportedOperationException(); + } + + @Override + public StorageAdapter asStorageAdapter() + { + throw new UnsupportedOperationException(); + } + + @Override + public T as(Class clazz) + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() + { + + } + }; + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 6f227e48699..f5455fbbfb2 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -65,15 +65,6 @@ public class SegmentLoadDropHandlerTest private static final Logger log = new Logger(ZkCoordinatorTest.class); private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - private final DruidServerMetadata me = new DruidServerMetadata( - "dummyServer", - "dummyHost", - null, - 0, - ServerType.HISTORICAL, - "normal", - 0 - ); private SegmentLoadDropHandler segmentLoadDropHandler; private DataSegmentAnnouncer announcer; diff --git a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index 52006f5880d..8debba13180 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -51,10 +51,17 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; /** @@ -65,11 +72,12 @@ public class BatchDataSegmentAnnouncerTest private static final String testBasePath = "/test"; private static final String testSegmentsPath = "/test/segments/id"; private static final Joiner joiner = Joiner.on("/"); + private static final int NUM_THREADS = 4; private TestingCluster testingCluster; private CuratorFramework cf; private ObjectMapper jsonMapper; - private Announcer announcer; + private TestAnnouncer announcer; private SegmentReader segmentReader; private BatchDataSegmentAnnouncer segmentAnnouncer; private Set testSegments; @@ -78,6 +86,7 @@ public class BatchDataSegmentAnnouncerTest private Boolean skipDimensionsAndMetrics; private Boolean skipLoadSpec; + private ExecutorService exec; @Before public void setUp() throws Exception @@ -96,7 +105,7 @@ public class BatchDataSegmentAnnouncerTest jsonMapper = TestHelper.makeJsonMapper(); - announcer = new Announcer( + announcer = new TestAnnouncer( cf, Execs.directExecutor() ); @@ -157,6 +166,8 @@ public class BatchDataSegmentAnnouncerTest for (int i = 0; i < 100; i++) { testSegments.add(makeSegment(i)); } + + exec = Execs.multiThreaded(NUM_THREADS, "BatchDataSegmentAnnouncerTest-%d"); } @After @@ -165,6 +176,7 @@ public class BatchDataSegmentAnnouncerTest announcer.stop(); cf.close(); testingCluster.stop(); + exec.shutdownNow(); } @Test @@ -299,6 +311,14 @@ public class BatchDataSegmentAnnouncerTest testBatchAnnounce(true); } + @Test + public void testMultipleBatchAnnounce() throws Exception + { + for (int i = 0; i < 10; i++) { + testBatchAnnounce(false); + } + } + private void testBatchAnnounce(boolean testHistory) throws Exception { segmentAnnouncer.announceSegments(testSegments); @@ -342,11 +362,72 @@ public class BatchDataSegmentAnnouncerTest } } - @Test - public void testMultipleBatchAnnounce() throws Exception + @Test(timeout = 5000L) + public void testAnnounceSegmentsWithSameSegmentConcurrently() throws ExecutionException, InterruptedException { - for (int i = 0; i < 10; i++) { - testBatchAnnounce(false); + final List futures = new ArrayList<>(NUM_THREADS); + + for (int i = 0; i < NUM_THREADS; i++) { + futures.add( + exec.submit(() -> { + try { + segmentAnnouncer.announceSegments(testSegments); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }) + ); + } + + for (Future future : futures) { + future.get(); + } + + // Announcing 100 segments requires 2 nodes because of maxBytesPerNode configuration. + Assert.assertEquals(2, announcer.numPathAnnounced.size()); + for (ConcurrentHashMap eachMap : announcer.numPathAnnounced.values()) { + for (Entry entry : eachMap.entrySet()) { + Assert.assertEquals(1, entry.getValue().get()); + } + } + } + + @Test(timeout = 5000L) + public void testAnnounceSegmentWithSameSegmentConcurrently() throws ExecutionException, InterruptedException + { + final List futures = new ArrayList<>(NUM_THREADS); + + final DataSegment segment1 = makeSegment(0); + final DataSegment segment2 = makeSegment(1); + final DataSegment segment3 = makeSegment(2); + final DataSegment segment4 = makeSegment(3); + + for (int i = 0; i < NUM_THREADS; i++) { + futures.add( + exec.submit(() -> { + try { + segmentAnnouncer.announceSegment(segment1); + segmentAnnouncer.announceSegment(segment2); + segmentAnnouncer.announceSegment(segment3); + segmentAnnouncer.announceSegment(segment4); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }) + ); + } + + for (Future future : futures) { + future.get(); + } + + Assert.assertEquals(1, announcer.numPathAnnounced.size()); + for (ConcurrentHashMap eachMap : announcer.numPathAnnounced.values()) { + for (Entry entry : eachMap.entrySet()) { + Assert.assertEquals(1, entry.getValue().get()); + } } } @@ -396,4 +477,21 @@ public class BatchDataSegmentAnnouncerTest return new HashSet<>(); } } + + private static class TestAnnouncer extends Announcer + { + private final ConcurrentHashMap> numPathAnnounced = new ConcurrentHashMap<>(); + + private TestAnnouncer(CuratorFramework curator, ExecutorService exec) + { + super(curator, exec); + } + + @Override + public void announce(String path, byte[] bytes, boolean removeParentIfCreated) + { + numPathAnnounced.computeIfAbsent(path, k -> new ConcurrentHashMap<>()).computeIfAbsent(bytes, k -> new AtomicInteger(0)).incrementAndGet(); + super.announce(path, bytes, removeParentIfCreated); + } + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java index 1b6ff3df57c..0f73695c167 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -20,12 +20,9 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.concurrent.Execs; @@ -47,12 +44,10 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -172,29 +167,8 @@ public class HttpLoadQueuePeonTest } - private static class TestDruidNodeDiscovery implements DruidNodeDiscovery - { - Listener listener; - - @Override - public Collection getAllNodes() - { - throw new UnsupportedOperationException("Not Implemented."); - } - - @Override - public void registerListener(Listener listener) - { - listener.nodesAdded(ImmutableList.of()); - listener.nodeViewInitialized(); - this.listener = listener; - } - } - private static class TestHttpClient implements HttpClient { - AtomicInteger requestNum = new AtomicInteger(0); - @Override public ListenableFuture go( Request request,