Make SegmentLoader extensible and customizable (#11398)

This PR refactors the code related to segment loading specifically SegmentLoader and SegmentLoaderLocalCacheManager. SegmentLoader is marked UnstableAPI which means, it can be extended outside core druid in custom extensions. Here is a summary of changes

SegmentLoader returns an instance of ReferenceCountingSegment instead of Segment. Earlier, SegmentManager was wrapping Segment objects inside ReferenceCountingSegment. That is now moved to SegmentLoader. With this, a custom implementation can track the references of segments. It also allows them to create custom ReferenceCountingSegment implementations. For this reason, the constructor visibility in ReferenceCountingSegment is changed from private to protected.
SegmentCacheManager has two additional methods called - reserve(DataSegment) and release(DataSegment). These methods let the caller reserve or release space without calling SegmentLoader#getSegment. We already had similar methods in StorageLocation and now they are available in SegmentCacheManager too which wraps multiple locations.
Refactoring to simplify the code in SegmentCacheManager wherever possible. There is no change in the functionality.
This commit is contained in:
Abhishek Agarwal 2021-07-22 18:00:49 +05:30 committed by GitHub
parent 167c45260c
commit ce1faa5635
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 462 additions and 67 deletions

View File

@ -606,6 +606,18 @@ public class DruidSegmentReaderTest extends NullHandlingTest
{
throw new UnsupportedOperationException("unused");
}
@Override
public boolean reserve(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public boolean release(DataSegment segment)
{
throw new UnsupportedOperationException();
}
},
DataSegment.builder()
.dataSource("ds")

View File

@ -33,6 +33,9 @@ import java.util.Optional;
* {@link Segment} that is also a {@link ReferenceCountingSegment}, allowing query engines that operate directly on
* segments to track references so that dropping a {@link Segment} can be done safely to ensure there are no in-flight
* queries.
*
* Extensions can extend this class for populating {@link org.apache.druid.timeline.VersionedIntervalTimeline} with
* a custom implementation through SegmentLoader.
*/
public class ReferenceCountingSegment extends ReferenceCountingCloseableObject<Segment>
implements SegmentReference, Overshadowable<ReferenceCountingSegment>
@ -67,7 +70,7 @@ public class ReferenceCountingSegment extends ReferenceCountingCloseableObject<S
);
}
private ReferenceCountingSegment(
protected ReferenceCountingSegment(
Segment baseSegment,
int startRootPartitionId,
int endRootPartitionId,
@ -172,4 +175,13 @@ public class ReferenceCountingSegment extends ReferenceCountingCloseableObject<S
{
return incrementReferenceAndDecrementOnceCloseable();
}
@Override
public <T> T as(Class<T> clazz)
{
if (isClosed()) {
return null;
}
return baseObject.as(clazz);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.timeline.SegmentId;
import org.easymock.EasyMock;
import org.joda.time.Days;
@ -43,6 +44,7 @@ public class ReferenceCountingSegmentTest
private final Interval dataInterval = new Interval(DateTimes.nowUtc().minus(Days.days(1)), DateTimes.nowUtc());
private QueryableIndex index;
private StorageAdapter adapter;
private IndexedTable indexedTable;
private int underlyingSegmentClosedCount;
@Before
@ -51,6 +53,7 @@ public class ReferenceCountingSegmentTest
underlyingSegmentClosedCount = 0;
index = EasyMock.createNiceMock(QueryableIndex.class);
adapter = EasyMock.createNiceMock(StorageAdapter.class);
indexedTable = EasyMock.createNiceMock(IndexedTable.class);
segment = ReferenceCountingSegment.wrapRootGenerationSegment(
new Segment()
@ -79,6 +82,19 @@ public class ReferenceCountingSegmentTest
return adapter;
}
@Override
public <T> T as(Class<T> clazz)
{
if (clazz.equals(QueryableIndex.class)) {
return (T) asQueryableIndex();
} else if (clazz.equals(StorageAdapter.class)) {
return (T) asStorageAdapter();
} else if (clazz.equals(IndexedTable.class)) {
return (T) indexedTable;
}
return null;
}
@Override
public void close()
{
@ -159,4 +175,13 @@ public class ReferenceCountingSegmentTest
Assert.assertEquals(adapter, segment.asStorageAdapter());
}
@Test
public void testSegmentAs()
{
Assert.assertSame(index, segment.as(QueryableIndex.class));
Assert.assertSame(adapter, segment.as(StorageAdapter.class));
Assert.assertSame(indexedTable, segment.as(IndexedTable.class));
Assert.assertNull(segment.as(String.class));
}
}

View File

@ -30,18 +30,58 @@ import java.io.File;
public interface SegmentCacheManager
{
/**
* Checks whether a segment is already cached.
* Checks whether a segment is already cached. It can return false even if {@link #reserve(DataSegment)}
* has been successful for a segment but is not downloaded yet.
*/
boolean isSegmentCached(DataSegment segment);
/**
* This method fetches the files for the given segment if the segment is not downloaded already.
* This method fetches the files for the given segment if the segment is not downloaded already. It
* is not required to {@link #reserve(DataSegment)} before calling this method. If caller has not reserved
* the space explicitly via {@link #reserve(DataSegment)}, the implementation should reserve space on caller's
* behalf.
* If the space has been explicitly reserved already
* - implementation should use only the reserved space to store segment files.
* - implementation should not release the location in case of download erros and leave it to the caller.
* @throws SegmentLoadingException if there is an error in downloading files
*/
File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
/**
* Cleanup the cache space used by the segment
* Tries to reserve the space for a segment on any location. When the space has been reserved,
* {@link #getSegmentFiles(DataSegment)} should download the segment on the reserved location or
* fail otherwise.
*
* This function is useful for custom extensions. Extensions can try to reserve the space first and
* if not successful, make some space by cleaning up other segments, etc. There is also improved
* concurrency for extensions with this function. Since reserve is a cheaper operation to invoke
* till the space has been reserved. Hence it can be put inside a lock if required by the extensions. getSegment
* can't be put inside a lock since it is a time-consuming operation, on account of downloading the files.
*
* @param segment - Segment to reserve
* @return True if enough space found to store the segment, false otherwise
*/
/*
* We only return a boolean result instead of a pointer to
* {@link StorageLocation} since we don't want callers to operate on {@code StorageLocation} directly outside {@code SegmentLoader}.
* {@link SegmentLoader} operates on the {@code StorageLocation} objects in a thread-safe manner.
*/
boolean reserve(DataSegment segment);
/**
* Reverts the effects of {@link #reserve(DataSegment)} (DataSegment)} by releasing the location reserved for this segment.
* Callers, that explicitly reserve the space via {@link #reserve(DataSegment)}, should use this method to release the space.
*
* Implementation can throw error if the space is being released but there is data present. Callers
* are supposed to ensure that any data is removed via {@link #cleanup(DataSegment)}
* @param segment - Segment to release the location for.
* @return - True if any location was reserved and released, false otherwise.
*/
boolean release(DataSegment segment);
/**
* Cleanup the cache space used by the segment. It will not release the space if the space has been
* explicitly reserved via {@link #reserve(DataSegment)}
*/
void cleanup(DataSegment segment);
}

View File

@ -19,7 +19,8 @@
package org.apache.druid.segment.loading;
import org.apache.druid.segment.Segment;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;
@ -27,16 +28,25 @@ import org.apache.druid.timeline.DataSegment;
* Loading segments from deep storage to local storage. Internally, this class can delegate the download to
* {@link SegmentCacheManager}. Implementations must be thread-safe.
*/
@UnstableApi
public interface SegmentLoader
{
/**
* Builds a {@link Segment} by downloading if necessary
* Returns a {@link ReferenceCountingSegment} that will be added by the {@link org.apache.druid.server.SegmentManager}
* to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This method can be called multiple times
* by the {@link org.apache.druid.server.SegmentManager} and implementation can either return same {@link ReferenceCountingSegment}
* or a different {@link ReferenceCountingSegment}. Caller should not assume any particular behavior.
*
* Returning a {@code ReferenceCountingSegment} will let custom implementations keep track of reference count for
* segments that the custom implementations are creating. That way, custom implementations can know when the segment
* is in use or not.
* @param segment - Segment to load
* @param lazy - Whether column metadata de-serialization is to be deferred to access time. Setting this flag to true can speed up segment loading
* @param loadFailed - Callback to invoke if lazy loading fails during column access.
* @throws SegmentLoadingException - If there is an error in loading the segment
*/
Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
/**
* cleanup any state used by this segment

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;
@ -46,7 +47,7 @@ public class SegmentLocalCacheLoader implements SegmentLoader
}
@Override
public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
public ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
final File segmentFiles = cacheManager.getSegmentFiles(segment);
File factoryJson = new File(segmentFiles, "factory.json");
@ -63,7 +64,8 @@ public class SegmentLocalCacheLoader implements SegmentLoader
factory = new MMappedQueryableSegmentizerFactory(indexIO);
}
return factory.factorize(segment, segmentFiles, lazy, loadFailed);
Segment segmentObject = factory.factorize(segment, segmentFiles, lazy, loadFailed);
return ReferenceCountingSegment.wrapSegment(segmentObject, segment.getShardSpec());
}
@Override

View File

@ -30,6 +30,7 @@ import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
@ -37,6 +38,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
*
*/
public class SegmentLocalCacheManager implements SegmentCacheManager
{
@ -110,6 +112,7 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
*
* This ctor is mainly for test cases, including test cases in other modules
*/
@VisibleForTesting
public SegmentLocalCacheManager(
SegmentLoaderConfig config,
@Json ObjectMapper mapper
@ -122,25 +125,45 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName());
}
static String getSegmentDir(DataSegment segment)
{
return DataSegmentPusher.getDefaultStorageDir(segment, false);
}
@Override
public boolean isSegmentCached(final DataSegment segment)
{
return findStorageLocationIfLoaded(segment) != null;
return findStoragePathIfCached(segment) != null;
}
/**
* This method will try to find if the segment is already downloaded on any location. If so, the segment path
* is returned. Along with that, location state is also updated with the segment location. Refer to
* {@link StorageLocation#maybeReserve(String, DataSegment)} for more details.
* If the segment files are damaged in any location, they are removed from the location.
* @param segment - Segment to check
* @return - Path corresponding to segment directory if found, null otherwise.
*/
@Nullable
private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
private File findStoragePathIfCached(final DataSegment segment)
{
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
String storageDir = getSegmentDir(segment);
File localStorageDir = location.segmentDirectoryAsFile(storageDir);
if (localStorageDir.exists()) {
if (checkSegmentFilesIntact(localStorageDir)) {
log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath());
log.warn(
"[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.",
localStorageDir.getAbsolutePath()
);
cleanupCacheFiles(location.getPath(), localStorageDir);
location.removeSegmentDir(localStorageDir, segment);
break;
} else {
return location;
// Before returning, we also reserve the space. Refer to the StorageLocation#maybeReserve documentation for details.
location.maybeReserve(storageDir, segment);
return localStorageDir;
}
}
}
@ -180,16 +203,12 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
StorageLocation loc = findStorageLocationIfLoaded(segment);
String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
if (loc == null) {
loc = loadSegmentWithRetry(segment, storageDir);
} else {
// If the segment is already downloaded on disk, we just update the current usage
loc.maybeReserve(storageDir, segment);
File segmentDir = findStoragePathIfCached(segment);
if (segmentDir != null) {
return segmentDir;
}
return new File(loc.getPath(), storageDir);
return loadSegmentWithRetry(segment);
}
finally {
unlock(segment, lock);
@ -198,44 +217,83 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
}
/**
* location may fail because of IO failure, most likely in two cases:<p>
* If we have already reserved a location before, probably via {@link #reserve(DataSegment)}, then only that location
* should be tried. Otherwise, we would fetch locations using {@link StorageLocationSelectorStrategy} and try all
* of them one by one till there is success.
* Location may fail because of IO failure, most likely in two cases:<p>
* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p>
* 2. disk failure, druid can't read/write to this disk anymore
*
* <p>
* Locations are fetched using {@link StorageLocationSelectorStrategy}.
*/
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
private File loadSegmentWithRetry(DataSegment segment) throws SegmentLoadingException
{
Iterator<StorageLocation> locationsIterator = strategy.getLocations();
String segmentDir = getSegmentDir(segment);
// Try the already reserved location. If location has been reserved outside, then we do not release the location
// here and simply delete any downloaded files. That is, we revert anything we do in this function and nothing else.
for (StorageLocation loc : locations) {
if (loc.isReserved(segmentDir)) {
File storageDir = loc.segmentDirectoryAsFile(segmentDir);
boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, false);
if (!success) {
throw new SegmentLoadingException("Failed to load segment %s in reserved location [%s]", segment.getId(), loc.getPath().getAbsolutePath());
}
return storageDir;
}
}
// No location was reserved so we try all the locations
Iterator<StorageLocation> locationsIterator = strategy.getLocations();
while (locationsIterator.hasNext()) {
StorageLocation loc = locationsIterator.next();
File storageDir = loc.reserve(storageDirStr, segment);
// storageDir is the file path corresponding to segment dir
File storageDir = loc.reserve(segmentDir, segment);
if (storageDir != null) {
try {
loadInLocationWithStartMarker(segment, storageDir);
return loc;
}
catch (SegmentLoadingException e) {
try {
log.makeAlert(
e,
"Failed to load segment in current location [%s], try next location if any",
loc.getPath().getAbsolutePath()
).addData("location", loc.getPath().getAbsolutePath()).emit();
}
finally {
loc.removeSegmentDir(storageDir, segment);
cleanupCacheFiles(loc.getPath(), storageDir);
}
boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, true);
if (success) {
return storageDir;
}
}
}
throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getId());
}
/**
* A helper method over {@link #loadInLocationWithStartMarker(DataSegment, File)} that catches the {@link SegmentLoadingException}
* and emits alerts.
* @param loc - {@link StorageLocation} where segment is to be downloaded in.
* @param segment - {@link DataSegment} to download
* @param storageDir - {@link File} pointing to segment directory
* @param releaseLocation - Whether to release the location in case of failures
* @return - True if segment was downloaded successfully, false otherwise.
*/
private boolean loadInLocationWithStartMarkerQuietly(StorageLocation loc, DataSegment segment, File storageDir, boolean releaseLocation)
{
try {
loadInLocationWithStartMarker(segment, storageDir);
return true;
}
catch (SegmentLoadingException e) {
try {
log.makeAlert(
e,
"Failed to load segment in current location [%s], try next location if any",
loc.getPath().getAbsolutePath()
).addData("location", loc.getPath().getAbsolutePath()).emit();
}
finally {
if (releaseLocation) {
loc.removeSegmentDir(storageDir, segment);
}
cleanupCacheFiles(loc.getPath(), storageDir);
}
}
return false;
}
private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException
{
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
@ -277,6 +335,73 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
}
}
@Override
public boolean reserve(final DataSegment segment)
{
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
// May be the segment was already loaded [This check is required to account for restart scenarios]
if (null != findStoragePathIfCached(segment)) {
return true;
}
String storageDirStr = getSegmentDir(segment);
// check if we already reserved the segment
for (StorageLocation location : locations) {
if (location.isReserved(storageDirStr)) {
return true;
}
}
// Not found in any location, reserve now
for (Iterator<StorageLocation> it = strategy.getLocations(); it.hasNext(); ) {
StorageLocation location = it.next();
if (null != location.reserve(storageDirStr, segment)) {
return true;
}
}
}
finally {
unlock(segment, lock);
}
}
return false;
}
@Override
public boolean release(final DataSegment segment)
{
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
String storageDir = getSegmentDir(segment);
// Release the first location encountered
for (StorageLocation location : locations) {
if (location.isReserved(storageDir)) {
File localStorageDir = location.segmentDirectoryAsFile(storageDir);
if (localStorageDir.exists()) {
throw new ISE(
"Asking to release a location '%s' while the segment directory '%s' is present on disk. Any state on disk must be deleted before releasing",
location.getPath().getAbsolutePath(),
localStorageDir.getAbsolutePath()
);
}
return location.release(storageDir, segment.getSize());
}
}
}
finally {
unlock(segment, lock);
}
}
return false;
}
@Override
public void cleanup(DataSegment segment)
{
@ -287,18 +412,17 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
StorageLocation loc = findStorageLocationIfLoaded(segment);
File loc = findStoragePathIfCached(segment);
if (loc == null) {
log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment.getId());
return;
}
// If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
File localStorageDir = new File(location.getPath(), getSegmentDir(segment));
if (localStorageDir.exists()) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.

View File

@ -26,6 +26,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.io.File;
import java.util.HashSet;
import java.util.Set;
@ -117,6 +118,16 @@ public class StorageLocation
return reserve(segmentDir, segment.getId().toString(), segment.getSize());
}
public synchronized boolean isReserved(String segmentDir)
{
return files.contains(segmentDirectoryAsFile(segmentDir));
}
public File segmentDirectoryAsFile(String segmentDir)
{
return new File(path, segmentDir); //lgtm [java/path-injection]
}
/**
* Reserves space to store the given segment, only if it has not been done already. This can be used
* when segment is already downloaded on the disk. Unlike {@link #reserve(String, DataSegment)}, this function

View File

@ -30,7 +30,6 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
@ -217,7 +216,7 @@ public class SegmentManager
*/
public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
final Segment adapter = getAdapter(segment, lazy, loadFailed);
final ReferenceCountingSegment adapter = getSegmentReference(segment, lazy, loadFailed);
final SettableSupplier<Boolean> resultSupplier = new SettableSupplier<>();
@ -252,9 +251,7 @@ public class SegmentManager
loadedIntervals.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(
ReferenceCountingSegment.wrapSegment(adapter, segment.getShardSpec())
)
segment.getShardSpec().createChunk(adapter)
);
dataSourceState.addSegment(segment);
resultSupplier.set(true);
@ -268,21 +265,21 @@ public class SegmentManager
return resultSupplier.get();
}
private Segment getAdapter(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
private ReferenceCountingSegment getSegmentReference(final DataSegment dataSegment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
final Segment adapter;
final ReferenceCountingSegment segment;
try {
adapter = segmentLoader.getSegment(segment, lazy, loadFailed);
segment = segmentLoader.getSegment(dataSegment, lazy, loadFailed);
}
catch (SegmentLoadingException e) {
segmentLoader.cleanup(segment);
segmentLoader.cleanup(dataSegment);
throw e;
}
if (adapter == null) {
throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec());
if (segment == null) {
throw new SegmentLoadingException("Null adapter from loadSpec[%s]", dataSegment.getLoadSpec());
}
return adapter;
return segment;
}
public void dropSegment(final DataSegment segment)

View File

@ -47,6 +47,18 @@ public class CacheTestSegmentCacheManager implements SegmentCacheManager
throw new UnsupportedOperationException();
}
@Override
public boolean reserve(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public boolean release(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public void cleanup(DataSegment segment)
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.loading;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.StorageAdapter;
@ -33,9 +34,9 @@ public class CacheTestSegmentLoader implements SegmentLoader
{
@Override
public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
{
return new Segment()
Segment baseSegment = new Segment()
{
@Override
public SegmentId getId()
@ -66,6 +67,7 @@ public class CacheTestSegmentLoader implements SegmentLoader
{
}
};
return ReferenceCountingSegment.wrapSegment(baseSegment, segment.getShardSpec());
}
@Override

View File

@ -38,6 +38,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class SegmentLocalCacheManagerTest
@ -762,4 +763,151 @@ public class SegmentLocalCacheManagerTest
Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentCached(segmentToDownload));
Assert.assertFalse(cachedSegmentDir.exists());
}
@Test
public void testReserveSegment()
{
final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L);
final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 200L, 0.0d);
final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d);
manager = new SegmentLocalCacheManager(
Arrays.asList(secondLocation, firstLocation),
new SegmentLoaderConfig(),
new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)),
jsonMapper
);
Assert.assertTrue(manager.reserve(dataSegment));
Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
Assert.assertEquals(100L, firstLocation.availableSizeBytes());
Assert.assertEquals(150L, secondLocation.availableSizeBytes());
// Reserving again should be no-op
Assert.assertTrue(manager.reserve(dataSegment));
Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
Assert.assertEquals(100L, firstLocation.availableSizeBytes());
Assert.assertEquals(150L, secondLocation.availableSizeBytes());
// Reserving a second segment should now go to a different location
final DataSegment otherSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D").withSize(100L);
Assert.assertTrue(manager.reserve(otherSegment));
Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
Assert.assertFalse(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(otherSegment, false)));
Assert.assertTrue(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(otherSegment, false)));
Assert.assertEquals(100L, firstLocation.availableSizeBytes());
Assert.assertEquals(50L, secondLocation.availableSizeBytes());
}
@Test
public void testReserveNotEnoughSpace()
{
final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L);
final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 50L, 0.0d);
final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d);
manager = new SegmentLocalCacheManager(
Arrays.asList(secondLocation, firstLocation),
new SegmentLoaderConfig(),
new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)),
jsonMapper
);
// should go to second location if first one doesn't have enough space
Assert.assertTrue(manager.reserve(dataSegment));
Assert.assertTrue(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
Assert.assertEquals(50L, firstLocation.availableSizeBytes());
Assert.assertEquals(50L, secondLocation.availableSizeBytes());
final DataSegment otherSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D").withSize(100L);
Assert.assertFalse(manager.reserve(otherSegment));
Assert.assertEquals(50L, firstLocation.availableSizeBytes());
Assert.assertEquals(50L, secondLocation.availableSizeBytes());
}
@Test
public void testSegmentDownloadWhenLocationReserved() throws Exception
{
final List<StorageLocationConfig> locationConfigs = new ArrayList<>();
final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, true);
final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L, true);
final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 1000000000L, true);
locationConfigs.add(locationConfig);
locationConfigs.add(locationConfig2);
locationConfigs.add(locationConfig3);
List<StorageLocation> locations = new ArrayList<>();
for (StorageLocationConfig locConfig : locationConfigs) {
locations.add(
new StorageLocation(
locConfig.getPath(),
locConfig.getMaxSize(),
locConfig.getFreeSpacePercent()
)
);
}
manager = new SegmentLocalCacheManager(
new SegmentLoaderConfig().withLocations(locationConfigs),
new RoundRobinStorageLocationSelectorStrategy(locations),
jsonMapper
);
StorageLocation location3 = manager.getLocations().get(2);
Assert.assertEquals(locationConfig3.getPath(), location3.getPath());
final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
// Segment should be downloaded in local_storage_folder3 even if that is the third location
final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
String segmentDir = DataSegmentPusher.getDefaultStorageDir(segmentToDownload, false);
location3.reserve(segmentDir, segmentToDownload);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
File segmentFile = manager.getSegmentFiles(segmentToDownload);
Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder3/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
manager.cleanup(segmentToDownload);
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
Assert.assertFalse(location3.isReserved(segmentDir));
}
@Test
public void testRelease()
{
final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L);
final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 50L, 0.0d);
final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d);
manager = new SegmentLocalCacheManager(
Arrays.asList(secondLocation, firstLocation),
new SegmentLoaderConfig(),
new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)),
jsonMapper
);
manager.reserve(dataSegment);
manager.release(dataSegment);
Assert.assertEquals(50L, firstLocation.availableSizeBytes());
Assert.assertEquals(150L, secondLocation.availableSizeBytes());
Assert.assertFalse(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
Assert.assertFalse(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
// calling release again should have no effect
manager.release(dataSegment);
Assert.assertEquals(50L, firstLocation.availableSizeBytes());
Assert.assertEquals(150L, secondLocation.availableSizeBytes());
}
}

View File

@ -64,12 +64,12 @@ public class SegmentManagerTest
private static final SegmentLoader SEGMENT_LOADER = new SegmentLoader()
{
@Override
public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
{
return new SegmentForTesting(
return ReferenceCountingSegment.wrapSegment(new SegmentForTesting(
MapUtils.getString(segment.getLoadSpec(), "version"),
(Interval) segment.getLoadSpec().get("interval")
);
), segment.getShardSpec());
}
@Override

View File

@ -148,12 +148,12 @@ public class ServerManagerTest
new SegmentLoader()
{
@Override
public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
{
return new SegmentForTesting(
return ReferenceCountingSegment.wrapSegment(new SegmentForTesting(
MapUtils.getString(segment.getLoadSpec(), "version"),
(Interval) segment.getLoadSpec().get("interval")
);
), segment.getShardSpec());
}
@Override