From ce1faa56352766522610e3ae306c15b55df19fb0 Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Thu, 22 Jul 2021 18:00:49 +0530 Subject: [PATCH] Make SegmentLoader extensible and customizable (#11398) This PR refactors the code related to segment loading specifically SegmentLoader and SegmentLoaderLocalCacheManager. SegmentLoader is marked UnstableAPI which means, it can be extended outside core druid in custom extensions. Here is a summary of changes SegmentLoader returns an instance of ReferenceCountingSegment instead of Segment. Earlier, SegmentManager was wrapping Segment objects inside ReferenceCountingSegment. That is now moved to SegmentLoader. With this, a custom implementation can track the references of segments. It also allows them to create custom ReferenceCountingSegment implementations. For this reason, the constructor visibility in ReferenceCountingSegment is changed from private to protected. SegmentCacheManager has two additional methods called - reserve(DataSegment) and release(DataSegment). These methods let the caller reserve or release space without calling SegmentLoader#getSegment. We already had similar methods in StorageLocation and now they are available in SegmentCacheManager too which wraps multiple locations. Refactoring to simplify the code in SegmentCacheManager wherever possible. There is no change in the functionality. --- .../input/DruidSegmentReaderTest.java | 12 ++ .../segment/ReferenceCountingSegment.java | 14 +- .../segment/ReferenceCountingSegmentTest.java | 25 +++ .../segment/loading/SegmentCacheManager.java | 46 +++- .../druid/segment/loading/SegmentLoader.java | 16 +- .../loading/SegmentLocalCacheLoader.java | 6 +- .../loading/SegmentLocalCacheManager.java | 200 ++++++++++++++---- .../segment/loading/StorageLocation.java | 11 + .../apache/druid/server/SegmentManager.java | 21 +- .../loading/CacheTestSegmentCacheManager.java | 12 ++ .../loading/CacheTestSegmentLoader.java | 6 +- .../loading/SegmentLocalCacheManagerTest.java | 148 +++++++++++++ .../druid/server/SegmentManagerTest.java | 6 +- .../coordination/ServerManagerTest.java | 6 +- 14 files changed, 462 insertions(+), 67 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java index 3886f3dd145..4e3e8b2bd37 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java @@ -606,6 +606,18 @@ public class DruidSegmentReaderTest extends NullHandlingTest { throw new UnsupportedOperationException("unused"); } + + @Override + public boolean reserve(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release(DataSegment segment) + { + throw new UnsupportedOperationException(); + } }, DataSegment.builder() .dataSource("ds") diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java index 5b531316cd5..55e390463b5 100644 --- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java @@ -33,6 +33,9 @@ import java.util.Optional; * {@link Segment} that is also a {@link ReferenceCountingSegment}, allowing query engines that operate directly on * segments to track references so that dropping a {@link Segment} can be done safely to ensure there are no in-flight * queries. + * + * Extensions can extend this class for populating {@link org.apache.druid.timeline.VersionedIntervalTimeline} with + * a custom implementation through SegmentLoader. */ public class ReferenceCountingSegment extends ReferenceCountingCloseableObject implements SegmentReference, Overshadowable @@ -67,7 +70,7 @@ public class ReferenceCountingSegment extends ReferenceCountingCloseableObject T as(Class clazz) + { + if (isClosed()) { + return null; + } + return baseObject.as(clazz); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java index 6566592e9d5..b1f65cfc8d1 100644 --- a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.timeline.SegmentId; import org.easymock.EasyMock; import org.joda.time.Days; @@ -43,6 +44,7 @@ public class ReferenceCountingSegmentTest private final Interval dataInterval = new Interval(DateTimes.nowUtc().minus(Days.days(1)), DateTimes.nowUtc()); private QueryableIndex index; private StorageAdapter adapter; + private IndexedTable indexedTable; private int underlyingSegmentClosedCount; @Before @@ -51,6 +53,7 @@ public class ReferenceCountingSegmentTest underlyingSegmentClosedCount = 0; index = EasyMock.createNiceMock(QueryableIndex.class); adapter = EasyMock.createNiceMock(StorageAdapter.class); + indexedTable = EasyMock.createNiceMock(IndexedTable.class); segment = ReferenceCountingSegment.wrapRootGenerationSegment( new Segment() @@ -79,6 +82,19 @@ public class ReferenceCountingSegmentTest return adapter; } + @Override + public T as(Class clazz) + { + if (clazz.equals(QueryableIndex.class)) { + return (T) asQueryableIndex(); + } else if (clazz.equals(StorageAdapter.class)) { + return (T) asStorageAdapter(); + } else if (clazz.equals(IndexedTable.class)) { + return (T) indexedTable; + } + return null; + } + @Override public void close() { @@ -159,4 +175,13 @@ public class ReferenceCountingSegmentTest Assert.assertEquals(adapter, segment.asStorageAdapter()); } + @Test + public void testSegmentAs() + { + Assert.assertSame(index, segment.as(QueryableIndex.class)); + Assert.assertSame(adapter, segment.as(StorageAdapter.class)); + Assert.assertSame(indexedTable, segment.as(IndexedTable.class)); + Assert.assertNull(segment.as(String.class)); + } + } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java index 94585747031..39ea7858406 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java @@ -30,18 +30,58 @@ import java.io.File; public interface SegmentCacheManager { /** - * Checks whether a segment is already cached. + * Checks whether a segment is already cached. It can return false even if {@link #reserve(DataSegment)} + * has been successful for a segment but is not downloaded yet. */ boolean isSegmentCached(DataSegment segment); /** - * This method fetches the files for the given segment if the segment is not downloaded already. + * This method fetches the files for the given segment if the segment is not downloaded already. It + * is not required to {@link #reserve(DataSegment)} before calling this method. If caller has not reserved + * the space explicitly via {@link #reserve(DataSegment)}, the implementation should reserve space on caller's + * behalf. + * If the space has been explicitly reserved already + * - implementation should use only the reserved space to store segment files. + * - implementation should not release the location in case of download erros and leave it to the caller. * @throws SegmentLoadingException if there is an error in downloading files */ File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; /** - * Cleanup the cache space used by the segment + * Tries to reserve the space for a segment on any location. When the space has been reserved, + * {@link #getSegmentFiles(DataSegment)} should download the segment on the reserved location or + * fail otherwise. + * + * This function is useful for custom extensions. Extensions can try to reserve the space first and + * if not successful, make some space by cleaning up other segments, etc. There is also improved + * concurrency for extensions with this function. Since reserve is a cheaper operation to invoke + * till the space has been reserved. Hence it can be put inside a lock if required by the extensions. getSegment + * can't be put inside a lock since it is a time-consuming operation, on account of downloading the files. + * + * @param segment - Segment to reserve + * @return True if enough space found to store the segment, false otherwise + */ + /* + * We only return a boolean result instead of a pointer to + * {@link StorageLocation} since we don't want callers to operate on {@code StorageLocation} directly outside {@code SegmentLoader}. + * {@link SegmentLoader} operates on the {@code StorageLocation} objects in a thread-safe manner. + */ + boolean reserve(DataSegment segment); + + /** + * Reverts the effects of {@link #reserve(DataSegment)} (DataSegment)} by releasing the location reserved for this segment. + * Callers, that explicitly reserve the space via {@link #reserve(DataSegment)}, should use this method to release the space. + * + * Implementation can throw error if the space is being released but there is data present. Callers + * are supposed to ensure that any data is removed via {@link #cleanup(DataSegment)} + * @param segment - Segment to release the location for. + * @return - True if any location was reserved and released, false otherwise. + */ + boolean release(DataSegment segment); + + /** + * Cleanup the cache space used by the segment. It will not release the space if the space has been + * explicitly reserved via {@link #reserve(DataSegment)} */ void cleanup(DataSegment segment); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java index 8fe38a31029..03bc0506f7d 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java @@ -19,7 +19,8 @@ package org.apache.druid.segment.loading; -import org.apache.druid.segment.Segment; +import org.apache.druid.guice.annotations.UnstableApi; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; @@ -27,16 +28,25 @@ import org.apache.druid.timeline.DataSegment; * Loading segments from deep storage to local storage. Internally, this class can delegate the download to * {@link SegmentCacheManager}. Implementations must be thread-safe. */ +@UnstableApi public interface SegmentLoader { + /** - * Builds a {@link Segment} by downloading if necessary + * Returns a {@link ReferenceCountingSegment} that will be added by the {@link org.apache.druid.server.SegmentManager} + * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This method can be called multiple times + * by the {@link org.apache.druid.server.SegmentManager} and implementation can either return same {@link ReferenceCountingSegment} + * or a different {@link ReferenceCountingSegment}. Caller should not assume any particular behavior. + * + * Returning a {@code ReferenceCountingSegment} will let custom implementations keep track of reference count for + * segments that the custom implementations are creating. That way, custom implementations can know when the segment + * is in use or not. * @param segment - Segment to load * @param lazy - Whether column metadata de-serialization is to be deferred to access time. Setting this flag to true can speed up segment loading * @param loadFailed - Callback to invoke if lazy loading fails during column access. * @throws SegmentLoadingException - If there is an error in loading the segment */ - Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException; + ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException; /** * cleanup any state used by this segment diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java index 6970f7b0caf..7fbf425bb2a 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java @@ -22,6 +22,7 @@ package org.apache.druid.segment.loading; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.guice.annotations.Json; import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; @@ -46,7 +47,7 @@ public class SegmentLocalCacheLoader implements SegmentLoader } @Override - public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException + public ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { final File segmentFiles = cacheManager.getSegmentFiles(segment); File factoryJson = new File(segmentFiles, "factory.json"); @@ -63,7 +64,8 @@ public class SegmentLocalCacheLoader implements SegmentLoader factory = new MMappedQueryableSegmentizerFactory(indexIO); } - return factory.factorize(segment, segmentFiles, lazy, loadFailed); + Segment segmentObject = factory.factorize(segment, segmentFiles, lazy, loadFailed); + return ReferenceCountingSegment.wrapSegment(segmentObject, segment.getShardSpec()); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java index 412cfe9728b..25c9cae4554 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java @@ -30,6 +30,7 @@ import org.apache.druid.timeline.DataSegment; import javax.annotation.Nonnull; import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.util.Iterator; @@ -37,6 +38,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** + * */ public class SegmentLocalCacheManager implements SegmentCacheManager { @@ -110,6 +112,7 @@ public class SegmentLocalCacheManager implements SegmentCacheManager * * This ctor is mainly for test cases, including test cases in other modules */ + @VisibleForTesting public SegmentLocalCacheManager( SegmentLoaderConfig config, @Json ObjectMapper mapper @@ -122,25 +125,45 @@ public class SegmentLocalCacheManager implements SegmentCacheManager log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName()); } + + static String getSegmentDir(DataSegment segment) + { + return DataSegmentPusher.getDefaultStorageDir(segment, false); + } + @Override public boolean isSegmentCached(final DataSegment segment) { - return findStorageLocationIfLoaded(segment) != null; + return findStoragePathIfCached(segment) != null; } + /** + * This method will try to find if the segment is already downloaded on any location. If so, the segment path + * is returned. Along with that, location state is also updated with the segment location. Refer to + * {@link StorageLocation#maybeReserve(String, DataSegment)} for more details. + * If the segment files are damaged in any location, they are removed from the location. + * @param segment - Segment to check + * @return - Path corresponding to segment directory if found, null otherwise. + */ @Nullable - private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) + private File findStoragePathIfCached(final DataSegment segment) { for (StorageLocation location : locations) { - File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); + String storageDir = getSegmentDir(segment); + File localStorageDir = location.segmentDirectoryAsFile(storageDir); if (localStorageDir.exists()) { if (checkSegmentFilesIntact(localStorageDir)) { - log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath()); + log.warn( + "[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", + localStorageDir.getAbsolutePath() + ); cleanupCacheFiles(location.getPath(), localStorageDir); location.removeSegmentDir(localStorageDir, segment); break; } else { - return location; + // Before returning, we also reserve the space. Refer to the StorageLocation#maybeReserve documentation for details. + location.maybeReserve(storageDir, segment); + return localStorageDir; } } } @@ -180,16 +203,12 @@ public class SegmentLocalCacheManager implements SegmentCacheManager final ReferenceCountingLock lock = createOrGetLock(segment); synchronized (lock) { try { - StorageLocation loc = findStorageLocationIfLoaded(segment); - String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); - - if (loc == null) { - loc = loadSegmentWithRetry(segment, storageDir); - } else { - // If the segment is already downloaded on disk, we just update the current usage - loc.maybeReserve(storageDir, segment); + File segmentDir = findStoragePathIfCached(segment); + if (segmentDir != null) { + return segmentDir; } - return new File(loc.getPath(), storageDir); + + return loadSegmentWithRetry(segment); } finally { unlock(segment, lock); @@ -198,44 +217,83 @@ public class SegmentLocalCacheManager implements SegmentCacheManager } /** - * location may fail because of IO failure, most likely in two cases:

+ * If we have already reserved a location before, probably via {@link #reserve(DataSegment)}, then only that location + * should be tried. Otherwise, we would fetch locations using {@link StorageLocationSelectorStrategy} and try all + * of them one by one till there is success. + * Location may fail because of IO failure, most likely in two cases:

* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly

* 2. disk failure, druid can't read/write to this disk anymore - * + *

* Locations are fetched using {@link StorageLocationSelectorStrategy}. */ - private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException + private File loadSegmentWithRetry(DataSegment segment) throws SegmentLoadingException { - Iterator locationsIterator = strategy.getLocations(); + String segmentDir = getSegmentDir(segment); + // Try the already reserved location. If location has been reserved outside, then we do not release the location + // here and simply delete any downloaded files. That is, we revert anything we do in this function and nothing else. + for (StorageLocation loc : locations) { + if (loc.isReserved(segmentDir)) { + File storageDir = loc.segmentDirectoryAsFile(segmentDir); + boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, false); + if (!success) { + throw new SegmentLoadingException("Failed to load segment %s in reserved location [%s]", segment.getId(), loc.getPath().getAbsolutePath()); + } + return storageDir; + } + } + + // No location was reserved so we try all the locations + Iterator locationsIterator = strategy.getLocations(); while (locationsIterator.hasNext()) { StorageLocation loc = locationsIterator.next(); - File storageDir = loc.reserve(storageDirStr, segment); + // storageDir is the file path corresponding to segment dir + File storageDir = loc.reserve(segmentDir, segment); if (storageDir != null) { - try { - loadInLocationWithStartMarker(segment, storageDir); - return loc; - } - catch (SegmentLoadingException e) { - try { - log.makeAlert( - e, - "Failed to load segment in current location [%s], try next location if any", - loc.getPath().getAbsolutePath() - ).addData("location", loc.getPath().getAbsolutePath()).emit(); - } - finally { - loc.removeSegmentDir(storageDir, segment); - cleanupCacheFiles(loc.getPath(), storageDir); - } + boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, true); + if (success) { + return storageDir; } } } throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getId()); } + /** + * A helper method over {@link #loadInLocationWithStartMarker(DataSegment, File)} that catches the {@link SegmentLoadingException} + * and emits alerts. + * @param loc - {@link StorageLocation} where segment is to be downloaded in. + * @param segment - {@link DataSegment} to download + * @param storageDir - {@link File} pointing to segment directory + * @param releaseLocation - Whether to release the location in case of failures + * @return - True if segment was downloaded successfully, false otherwise. + */ + private boolean loadInLocationWithStartMarkerQuietly(StorageLocation loc, DataSegment segment, File storageDir, boolean releaseLocation) + { + try { + loadInLocationWithStartMarker(segment, storageDir); + return true; + } + catch (SegmentLoadingException e) { + try { + log.makeAlert( + e, + "Failed to load segment in current location [%s], try next location if any", + loc.getPath().getAbsolutePath() + ).addData("location", loc.getPath().getAbsolutePath()).emit(); + } + finally { + if (releaseLocation) { + loc.removeSegmentDir(storageDir, segment); + } + cleanupCacheFiles(loc.getPath(), storageDir); + } + } + return false; + } + private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException { // We use a marker to prevent the case where a segment is downloaded, but before the download completes, @@ -277,6 +335,73 @@ public class SegmentLocalCacheManager implements SegmentCacheManager } } + @Override + public boolean reserve(final DataSegment segment) + { + final ReferenceCountingLock lock = createOrGetLock(segment); + synchronized (lock) { + try { + // May be the segment was already loaded [This check is required to account for restart scenarios] + if (null != findStoragePathIfCached(segment)) { + return true; + } + + String storageDirStr = getSegmentDir(segment); + + // check if we already reserved the segment + for (StorageLocation location : locations) { + if (location.isReserved(storageDirStr)) { + return true; + } + } + + // Not found in any location, reserve now + for (Iterator it = strategy.getLocations(); it.hasNext(); ) { + StorageLocation location = it.next(); + if (null != location.reserve(storageDirStr, segment)) { + return true; + } + } + } + finally { + unlock(segment, lock); + } + } + + return false; + } + + @Override + public boolean release(final DataSegment segment) + { + final ReferenceCountingLock lock = createOrGetLock(segment); + synchronized (lock) { + try { + String storageDir = getSegmentDir(segment); + + // Release the first location encountered + for (StorageLocation location : locations) { + if (location.isReserved(storageDir)) { + File localStorageDir = location.segmentDirectoryAsFile(storageDir); + if (localStorageDir.exists()) { + throw new ISE( + "Asking to release a location '%s' while the segment directory '%s' is present on disk. Any state on disk must be deleted before releasing", + location.getPath().getAbsolutePath(), + localStorageDir.getAbsolutePath() + ); + } + return location.release(storageDir, segment.getSize()); + } + } + } + finally { + unlock(segment, lock); + } + } + + return false; + } + @Override public void cleanup(DataSegment segment) { @@ -287,18 +412,17 @@ public class SegmentLocalCacheManager implements SegmentCacheManager final ReferenceCountingLock lock = createOrGetLock(segment); synchronized (lock) { try { - StorageLocation loc = findStorageLocationIfLoaded(segment); + File loc = findStoragePathIfCached(segment); if (loc == null) { log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment.getId()); return; } - // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed, // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not. // So we should always clean all possible locations here for (StorageLocation location : locations) { - File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); + File localStorageDir = new File(location.getPath(), getSegmentDir(segment)); if (localStorageDir.exists()) { // Druid creates folders of the form dataSource/interval/version/partitionNum. // We need to clean up all these directories if they are all empty. diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index efaefe96d21..60f1831856b 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; + import java.io.File; import java.util.HashSet; import java.util.Set; @@ -117,6 +118,16 @@ public class StorageLocation return reserve(segmentDir, segment.getId().toString(), segment.getSize()); } + public synchronized boolean isReserved(String segmentDir) + { + return files.contains(segmentDirectoryAsFile(segmentDir)); + } + + public File segmentDirectoryAsFile(String segmentDir) + { + return new File(path, segmentDir); //lgtm [java/path-injection] + } + /** * Reserves space to store the given segment, only if it has not been done already. This can be used * when segment is already downloaded on the disk. Unlike {@link #reserve(String, DataSegment)}, this function diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 486ad46920b..dcd5a1d151e 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -30,7 +30,6 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.ReferenceCountingSegment; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable; @@ -217,7 +216,7 @@ public class SegmentManager */ public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { - final Segment adapter = getAdapter(segment, lazy, loadFailed); + final ReferenceCountingSegment adapter = getSegmentReference(segment, lazy, loadFailed); final SettableSupplier resultSupplier = new SettableSupplier<>(); @@ -252,9 +251,7 @@ public class SegmentManager loadedIntervals.add( segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk( - ReferenceCountingSegment.wrapSegment(adapter, segment.getShardSpec()) - ) + segment.getShardSpec().createChunk(adapter) ); dataSourceState.addSegment(segment); resultSupplier.set(true); @@ -268,21 +265,21 @@ public class SegmentManager return resultSupplier.get(); } - private Segment getAdapter(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException + private ReferenceCountingSegment getSegmentReference(final DataSegment dataSegment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { - final Segment adapter; + final ReferenceCountingSegment segment; try { - adapter = segmentLoader.getSegment(segment, lazy, loadFailed); + segment = segmentLoader.getSegment(dataSegment, lazy, loadFailed); } catch (SegmentLoadingException e) { - segmentLoader.cleanup(segment); + segmentLoader.cleanup(dataSegment); throw e; } - if (adapter == null) { - throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec()); + if (segment == null) { + throw new SegmentLoadingException("Null adapter from loadSpec[%s]", dataSegment.getLoadSpec()); } - return adapter; + return segment; } public void dropSegment(final DataSegment segment) diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java index a2686816325..ca314b97b9c 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java @@ -47,6 +47,18 @@ public class CacheTestSegmentCacheManager implements SegmentCacheManager throw new UnsupportedOperationException(); } + @Override + public boolean reserve(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + @Override public void cleanup(DataSegment segment) { diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index cf47755ba94..831a1a434b1 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.loading; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.StorageAdapter; @@ -33,9 +34,9 @@ public class CacheTestSegmentLoader implements SegmentLoader { @Override - public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) + public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { - return new Segment() + Segment baseSegment = new Segment() { @Override public SegmentId getId() @@ -66,6 +67,7 @@ public class CacheTestSegmentLoader implements SegmentLoader { } }; + return ReferenceCountingSegment.wrapSegment(baseSegment, segment.getShardSpec()); } @Override diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java index 26c9cbdabf4..5d116d12d96 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java @@ -38,6 +38,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class SegmentLocalCacheManagerTest @@ -762,4 +763,151 @@ public class SegmentLocalCacheManagerTest Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentCached(segmentToDownload)); Assert.assertFalse(cachedSegmentDir.exists()); } + + @Test + public void testReserveSegment() + { + final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L); + final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 200L, 0.0d); + final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d); + + manager = new SegmentLocalCacheManager( + Arrays.asList(secondLocation, firstLocation), + new SegmentLoaderConfig(), + new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)), + jsonMapper + ); + Assert.assertTrue(manager.reserve(dataSegment)); + Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertEquals(100L, firstLocation.availableSizeBytes()); + Assert.assertEquals(150L, secondLocation.availableSizeBytes()); + + // Reserving again should be no-op + Assert.assertTrue(manager.reserve(dataSegment)); + Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertEquals(100L, firstLocation.availableSizeBytes()); + Assert.assertEquals(150L, secondLocation.availableSizeBytes()); + + // Reserving a second segment should now go to a different location + final DataSegment otherSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D").withSize(100L); + Assert.assertTrue(manager.reserve(otherSegment)); + Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertFalse(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(otherSegment, false))); + Assert.assertTrue(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(otherSegment, false))); + Assert.assertEquals(100L, firstLocation.availableSizeBytes()); + Assert.assertEquals(50L, secondLocation.availableSizeBytes()); + } + + @Test + public void testReserveNotEnoughSpace() + { + final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L); + final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 50L, 0.0d); + final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d); + + manager = new SegmentLocalCacheManager( + Arrays.asList(secondLocation, firstLocation), + new SegmentLoaderConfig(), + new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)), + jsonMapper + ); + + // should go to second location if first one doesn't have enough space + Assert.assertTrue(manager.reserve(dataSegment)); + Assert.assertTrue(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertEquals(50L, firstLocation.availableSizeBytes()); + Assert.assertEquals(50L, secondLocation.availableSizeBytes()); + + final DataSegment otherSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D").withSize(100L); + Assert.assertFalse(manager.reserve(otherSegment)); + Assert.assertEquals(50L, firstLocation.availableSizeBytes()); + Assert.assertEquals(50L, secondLocation.availableSizeBytes()); + } + + @Test + public void testSegmentDownloadWhenLocationReserved() throws Exception + { + final List locationConfigs = new ArrayList<>(); + final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, true); + final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L, true); + final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 1000000000L, true); + locationConfigs.add(locationConfig); + locationConfigs.add(locationConfig2); + locationConfigs.add(locationConfig3); + + List locations = new ArrayList<>(); + for (StorageLocationConfig locConfig : locationConfigs) { + locations.add( + new StorageLocation( + locConfig.getPath(), + locConfig.getMaxSize(), + locConfig.getFreeSpacePercent() + ) + ); + } + + manager = new SegmentLocalCacheManager( + new SegmentLoaderConfig().withLocations(locationConfigs), + new RoundRobinStorageLocationSelectorStrategy(locations), + jsonMapper + ); + + StorageLocation location3 = manager.getLocations().get(2); + Assert.assertEquals(locationConfig3.getPath(), location3.getPath()); + final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder"); + + // Segment should be downloaded in local_storage_folder3 even if that is the third location + final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + String segmentDir = DataSegmentPusher.getDefaultStorageDir(segmentToDownload, false); + location3.reserve(segmentDir, segmentToDownload); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); + + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload)); + + File segmentFile = manager.getSegmentFiles(segmentToDownload); + Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder3/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload)); + + manager.cleanup(segmentToDownload); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload)); + Assert.assertFalse(location3.isReserved(segmentDir)); + } + + @Test + public void testRelease() + { + final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L); + final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 50L, 0.0d); + final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d); + + manager = new SegmentLocalCacheManager( + Arrays.asList(secondLocation, firstLocation), + new SegmentLoaderConfig(), + new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)), + jsonMapper + ); + + manager.reserve(dataSegment); + manager.release(dataSegment); + Assert.assertEquals(50L, firstLocation.availableSizeBytes()); + Assert.assertEquals(150L, secondLocation.availableSizeBytes()); + Assert.assertFalse(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertFalse(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + + // calling release again should have no effect + manager.release(dataSegment); + Assert.assertEquals(50L, firstLocation.availableSizeBytes()); + Assert.assertEquals(150L, secondLocation.availableSizeBytes()); + } } diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 8698146c9d5..83f0a8ff1e0 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -64,12 +64,12 @@ public class SegmentManagerTest private static final SegmentLoader SEGMENT_LOADER = new SegmentLoader() { @Override - public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) + public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) { - return new SegmentForTesting( + return ReferenceCountingSegment.wrapSegment(new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), (Interval) segment.getLoadSpec().get("interval") - ); + ), segment.getShardSpec()); } @Override diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 16832b3dfcb..a76cc5b25be 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -148,12 +148,12 @@ public class ServerManagerTest new SegmentLoader() { @Override - public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) + public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { - return new SegmentForTesting( + return ReferenceCountingSegment.wrapSegment(new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), (Interval) segment.getLoadSpec().get("interval") - ); + ), segment.getShardSpec()); } @Override