Making optimal usage of multiple segment cache locations (#8038)

* #7641 - Changing segment distribution algorithm to distribute segments to multiple segment cache locations

* Fixing indentation

* WIP

* Adding interface for location strategy selection, least bytes used strategy impl, round-robin strategy impl, locationSelectorStrategy config with least bytes used strategy as the default strategy

* fixing code style

* Fixing test

* Adding a method visible only for testing, fixing tests

* 1. Changing the method contract to return an iterator of locations instead of a single best location. 2. Check style fixes

* fixing the conditional statement

* Added testSegmentDistributionUsingLeastBytesUsedStrategy, fixed testSegmentDistributionUsingRoundRobinStrategy

* to trigger CI build

* Add documentation for the selection strategy configuration

* to re trigger CI build

* updated docs as per review comments, made LeastBytesUsedStorageLocationSelectorStrategy.getLocations a synchronzied method, other minor fixes

* In checkLocationConfigForNull method, using getLocations() to check for null instead of directly referring to the locations variable so that tests overriding getLocations() method do not fail

* Implementing review comments. Added tests for StorageLocationSelectorStrategy

* Checkstyle fixes

* Adding java doc comments for StorageLocationSelectorStrategy interface

* checkstyle

* empty commit to retrigger build

* Empty commit

* Adding suppressions for words leastBytesUsed and roundRobin of ../docs/configuration/index.md file

* Impl review comments including updating docs as suggested

* Removing checkLocationConfigForNull(), @NotEmpty annotation serves the purpose

* Round robin iterator to keep track of the no. of iterations, impl review comments, added tests for round robin strategy

* Fixing the round robin iterator

* Removed numLocationsToTry, updated java docs

* changing property attribute value from tier to type

* Fixing assert messages
This commit is contained in:
Sashidhar Thallam 2019-09-28 11:47:44 +05:30 committed by David Lim
parent 17d9d7daed
commit 51a7235ebc
10 changed files with 631 additions and 13 deletions

View File

@ -1286,6 +1286,7 @@ These Historical configurations can be defined in the `historical/runtime.proper
|Property|Description|Default|
|--------|-----------|-------|
|`druid.segmentCache.locations`|Segments assigned to a Historical process are first stored on the local file system (in a disk cache) and then served by the Historical process. These locations define where that local cache resides. This value cannot be NULL or EMPTY. Here is an example `druid.segmentCache.locations=[{"path": "/mnt/druidSegments", "maxSize": 10000, "freeSpacePercent": 1.0}]`. "freeSpacePercent" is optional, if provided then enforces that much of free disk partition space while storing segments. But, it depends on File.getTotalSpace() and File.getFreeSpace() methods, so enable if only if they work for your File System.| none |
|`druid.segmentCache.locationSelectorStrategy`|The strategy used to select a location from the configured `druid.segmentCache.locations` for segment distribution. Possible values are `leastBytesUsed` or `roundRobin`. |leastBytesUsed|
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a process is no longer serving a segment.|true|
|`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)|
|`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
@ -1296,6 +1297,8 @@ These Historical configurations can be defined in the `historical/runtime.proper
In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.
In `druid.segmentCache.locationSelectorStrategy`, one of leastBytesUsed or roundRobin could be specified to represent the strategy to distribute segments across multiple segment cache locations. The leastBytesUsed which is the default strategy always selects a location which has least bytes used in absolute terms. The roundRobin strategy selects a location in a round robin fashion oblivious to the bytes used or the capacity. Note that `if druid.segmentCache.numLoadingThreads` > 1, multiple threads can download different segments at the same time. In this case, with the leastBytesUsed strategy, historicals may select a sub-optimal storage location because each decision is based on a snapshot of the storage location status of when a segment is requested to download.
#### Historical query configs
##### Concurrent Requests

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.google.common.collect.Ordering;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
/**
* A {@link StorageLocation} selector strategy that selects a segment cache location that is least filled each time
* among the available storage locations.
*/
public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy
{
private static final Ordering<StorageLocation> ORDERING = Ordering.from(Comparator
.comparingLong(StorageLocation::currSizeBytes));
private List<StorageLocation> storageLocations;
public LeastBytesUsedStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
{
this.storageLocations = storageLocations;
}
@Override
public Iterator<StorageLocation> getLocations()
{
return ORDERING.sortedCopy(this.storageLocations).iterator();
}
@Override
public String toString()
{
return "LeastBytesUsedStorageLocationSelectorStrategy{" +
"storageLocations=" + storageLocations +
'}';
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A {@link StorageLocation} selector strategy that selects a segment cache location in a round-robin fashion each time
* among the available storage locations. When {@link Iterator#next()} on iterator retuned by
* {@link RoundRobinStorageLocationSelectorStrategy#getLocations()} is called the locations are returned in a round
* robin fashion even when multiple threads are in use.
*/
public class RoundRobinStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy
{
private final List<StorageLocation> storageLocations;
private final AtomicInteger startIndex = new AtomicInteger(0);
public RoundRobinStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
{
this.storageLocations = storageLocations;
}
@Override
public Iterator<StorageLocation> getLocations()
{
return new Iterator<StorageLocation>() {
private final int numStorageLocations = storageLocations.size();
private int remainingIterations = numStorageLocations;
@Override
public boolean hasNext()
{
return remainingIterations > 0;
}
@Override
public StorageLocation next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
remainingIterations--;
final StorageLocation nextLocation =
storageLocations.get(startIndex.getAndUpdate(n -> (n + 1) % numStorageLocations));
return nextLocation;
}
};
}
}

View File

@ -20,8 +20,8 @@
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.utils.JvmUtils;
import org.hibernate.validator.constraints.NotEmpty;
@ -52,6 +52,9 @@ public class SegmentLoaderConfig
@JsonProperty("numBootstrapThreads")
private Integer numBootstrapThreads = null;
@JsonProperty("locationSelectorStrategy")
private StorageLocationSelectorStrategy locationSelectorStrategy;
@JsonProperty
private File infoDir = null;
@ -88,16 +91,20 @@ public class SegmentLoaderConfig
return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
}
public StorageLocationSelectorStrategy getStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
{
if (locationSelectorStrategy == null) {
// default strategy if no strategy is specified in the config
locationSelectorStrategy = new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations);
}
return locationSelectorStrategy;
}
public File getInfoDir()
{
if (infoDir == null) {
if (locations == null || locations.size() == 0) {
throw new ISE("You have no segment cache locations defined. Please configure druid.segmentCache.locations to use one or more locations.");
}
infoDir = new File(locations.get(0).getPath(), "info_dir");
}
return infoDir;
}
@ -115,6 +122,13 @@ public class SegmentLoaderConfig
return retVal;
}
@VisibleForTesting
SegmentLoaderConfig withStorageLocationSelectorStrategy(StorageLocationSelectorStrategy strategy)
{
this.locationSelectorStrategy = strategy;
return this;
}
@Override
public String toString()
{
@ -122,6 +136,7 @@ public class SegmentLoaderConfig
"locations=" + locations +
", deleteOnRemove=" + deleteOnRemove +
", dropSegmentDelayMillis=" + dropSegmentDelayMillis +
", locationSelectorStrategy=" + locationSelectorStrategy +
", infoDir=" + infoDir +
'}';
}

View File

@ -33,7 +33,7 @@ import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@ -42,9 +42,6 @@ import java.util.concurrent.ConcurrentHashMap;
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
private static final Comparator<StorageLocation> COMPARATOR = Comparator
.comparingLong(StorageLocation::availableSizeBytes)
.reversed();
private final IndexIO indexIO;
private final SegmentLoaderConfig config;
@ -77,6 +74,8 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
*/
private final ConcurrentHashMap<DataSegment, ReferenceCountingLock> segmentLocks = new ConcurrentHashMap<>();
private final StorageLocationSelectorStrategy strategy;
// Note that we only create this via injection in historical and realtime nodes. Peons create these
// objects via SegmentLoaderFactory objects, so that they can store segments in task-specific
// directories rather than statically configured directories.
@ -101,7 +100,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
)
);
}
locations.sort(COMPARATOR);
this.strategy = config.getStorageLocationSelectorStrategy(locations);
}
@Override
@ -175,10 +174,17 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
* location may fail because of IO failure, most likely in two cases:<p>
* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p>
* 2. disk failure, druid can't read/write to this disk anymore
*
* Locations are fetched using {@link StorageLocationSelectorStrategy}.
*/
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
{
for (StorageLocation loc : locations) {
Iterator<StorageLocation> locationsIterator = strategy.getLocations();
while (locationsIterator.hasNext()) {
StorageLocation loc = locationsIterator.next();
File storageDir = loc.reserve(storageDirStr, segment);
if (storageDir != null) {
try {

View File

@ -177,4 +177,9 @@ public class StorageLocation
{
return maxSizeBytes - currSizeBytes;
}
public synchronized long currSizeBytes()
{
return currSizeBytes;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.timeline.DataSegment;
import java.util.Iterator;
/**
* This interface describes the storage location selection strategy which is responsible for ordering the
* available multiple {@link StorageLocation}s for segment distribution.
*
* Only a snapshot of the locations is returned here. The implemntations currently do not handle all kinds of
* concurrency issues and accesses to the underlying storage. Please see
* https://github.com/apache/incubator-druid/pull/8038#discussion_r325520829 of PR https://github
* .com/apache/incubator-druid/pull/8038 for more details.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl =
LeastBytesUsedStorageLocationSelectorStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "leastBytesUsed", value = LeastBytesUsedStorageLocationSelectorStrategy.class),
@JsonSubTypes.Type(name = "roundRobin", value = RoundRobinStorageLocationSelectorStrategy.class)
})
public interface StorageLocationSelectorStrategy
{
/**
* Finds the best ordering of the {@link StorageLocation}s to load a {@link DataSegment} according to
* the location selector strategy. This method returns an iterator instead of a single best location. The
* caller is responsible for iterating over the locations and calling {@link StorageLocation#reserve}
* method. This is because a single location may be problematic like failed disk or might become unwritable for
* whatever reasons.
*
* This method can be called by different threads and so should be thread-safe.
*
* @return An iterator of {@link StorageLocation}s from which the callers can iterate and pick a location.
*/
Iterator<StorageLocation> getLocations();
}

View File

@ -372,6 +372,11 @@ public class SegmentLoaderLocalCacheManagerTest
}
private DataSegment dataSegmentWithInterval(String intervalStr)
{
return dataSegmentWithInterval(intervalStr, 10L);
}
private DataSegment dataSegmentWithInterval(String intervalStr, long size)
{
return DataSegment.builder()
.dataSource("test_segment_loader")
@ -389,7 +394,265 @@ public class SegmentLoaderLocalCacheManagerTest
.metrics(ImmutableList.of())
.shardSpec(NoneShardSpec.instance())
.binaryVersion(9)
.size(10L)
.size(size)
.build();
}
@Test
public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception
{
final List<StorageLocationConfig> locationConfigs = new ArrayList<>();
final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, true);
final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L, true);
final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 1000000000L, true);
locationConfigs.add(locationConfig);
locationConfigs.add(locationConfig2);
locationConfigs.add(locationConfig3);
List<StorageLocation> locations = new ArrayList<>();
for (StorageLocationConfig locConfig : locationConfigs) {
locations.add(
new StorageLocation(
locConfig.getPath(),
locConfig.getMaxSize(),
locConfig.getFreeSpacePercent()
)
);
}
manager = new SegmentLoaderLocalCacheManager(
TestHelper.getTestIndexIO(),
new SegmentLoaderConfig().withLocations(locationConfigs).withStorageLocationSelectorStrategy(
new RoundRobinStorageLocationSelectorStrategy(locations)
),
jsonMapper
);
final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
// Segment 1 should be downloaded in local_storage_folder
final DataSegment segmentToDownload1 = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload1));
File segmentFile = manager.getSegmentFiles(segmentToDownload1);
Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload1));
manager.cleanup(segmentToDownload1);
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload1));
// Segment 2 should be downloaded in local_storage_folder2
final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2));
File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
manager.cleanup(segmentToDownload2);
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2));
// Segment 3 should be downloaded in local_storage_folder3
final DataSegment segmentToDownload3 = dataSegmentWithInterval("2014-12-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder,
"test_segment_loader/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
File segmentFile3 = manager.getSegmentFiles(segmentToDownload3);
Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3));
manager.cleanup(segmentToDownload3);
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3));
// Segment 4 should be downloaded in local_storage_folder again, asserting round robin distribution of segments
final DataSegment segmentToDownload4 = dataSegmentWithInterval("2014-08-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" +
".000Z/2015-05-27T03:38:35.683Z/0");
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4));
File segmentFile1 = manager.getSegmentFiles(segmentToDownload4);
Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4));
manager.cleanup(segmentToDownload4);
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload4));
}
private void createLocalSegmentFile(File segmentSrcFolder, String localSegmentPath) throws Exception
{
// manually create a local segment under segmentSrcFolder
final File localSegmentFile = new File(segmentSrcFolder, localSegmentPath);
localSegmentFile.mkdirs();
final File indexZip = new File(localSegmentFile, "index.zip");
indexZip.createNewFile();
}
private StorageLocationConfig createStorageLocationConfig(String localPath, long maxSize, boolean writable) throws Exception
{
final File localStorageFolder = tmpFolder.newFolder(localPath);
localStorageFolder.setWritable(writable);
final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, maxSize, 1.0);
return locationConfig;
}
@Test
public void testSegmentDistributionUsingLeastBytesUsedStrategy() throws Exception
{
final List<StorageLocationConfig> locations = new ArrayList<>();
final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L,
true);
final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L,
true);
final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 1000000000L,
true);
locations.add(locationConfig);
locations.add(locationConfig2);
locations.add(locationConfig3);
manager = new SegmentLoaderLocalCacheManager(
TestHelper.getTestIndexIO(),
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
// Segment 1 should be downloaded in local_storage_folder, segment1 size 10L
final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D", 10L).withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder,
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
File segmentFile = manager.getSegmentFiles(segmentToDownload);
Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
// Segment 2 should be downloaded in local_storage_folder2, segment2 size 5L
final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D", 5L).withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder,
"test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2));
File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
// Segment 3 should be downloaded in local_storage_folder3, segment3 size 20L
final DataSegment segmentToDownload3 = dataSegmentWithInterval("2014-12-20T00:00:00Z/P1D", 20L).withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder,
"test_segment_loader/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
File segmentFile3 = manager.getSegmentFiles(segmentToDownload3);
Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3));
// Now the storage locations local_storage_folder1, local_storage_folder2 and local_storage_folder3 have 10, 5 and
// 20 bytes occupied respectively. The default strategy should pick location2 (as it has least bytes used) for the
// next segment to be downloaded asserting the least bytes used distribution of segments.
final DataSegment segmentToDownload4 = dataSegmentWithInterval("2014-08-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" +
".000Z/2015-05-27T03:38:35.683Z/0");
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4));
File segmentFile1 = manager.getSegmentFiles(segmentToDownload4);
Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder2/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4));
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class StorageLocationSelectorStrategyTest
{
@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();
@Test
public void testLeastBytesUsedLocationSelectorStrategy() throws Exception
{
List<StorageLocation> storageLocations = new ArrayList<>();
final File localStorageFolder1 = tmpFolder.newFolder("local_storage_folder_1");
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder_2");
final File localStorageFolder3 = tmpFolder.newFolder("local_storage_folder_3");
StorageLocation storageLocation1 = new StorageLocation(localStorageFolder1, 10000000000L,
null);
storageLocations.add(storageLocation1);
storageLocations.add(new StorageLocation(localStorageFolder2, 10000000000L, null));
storageLocations.add(new StorageLocation(localStorageFolder3, 10000000000L, null));
StorageLocationSelectorStrategy leastBytesUsedStrategy =
new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations);
storageLocation1.reserve("tmp_loc1", "__seg1", 1024L);
Iterator<StorageLocation> locations = leastBytesUsedStrategy.getLocations();
StorageLocation loc1 = locations.next();
Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_2",
localStorageFolder2, loc1.getPath());
StorageLocation loc2 = locations.next();
Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_3",
localStorageFolder3, loc2.getPath());
StorageLocation loc3 = locations.next();
Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1",
localStorageFolder1, loc3.getPath());
}
@Test
public void testRoundRobinLocationSelectorStrategySingleLocation() throws Exception
{
List<StorageLocation> storageLocations = new ArrayList<>();
final File localStorageFolder1 = tmpFolder.newFolder("local_storage_folder_1");
storageLocations.add(new StorageLocation(localStorageFolder1, 10000000000L, null));
StorageLocationSelectorStrategy roundRobinStrategy =
new RoundRobinStorageLocationSelectorStrategy(storageLocations);
Iterator<StorageLocation> locations = roundRobinStrategy.getLocations();
StorageLocation loc1 = locations.next();
Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1",
localStorageFolder1, loc1.getPath());
locations = roundRobinStrategy.getLocations();
StorageLocation loc2 = locations.next();
Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1",
localStorageFolder1, loc2.getPath());
}
@Test
public void testRoundRobinLocationSelectorStrategy() throws Exception
{
List<StorageLocation> storageLocations = new ArrayList<>();
final File localStorageFolder1 = tmpFolder.newFolder("local_storage_folder_1");
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder_2");
final File localStorageFolder3 = tmpFolder.newFolder("local_storage_folder_3");
storageLocations.add(new StorageLocation(localStorageFolder1, 10000000000L, null));
storageLocations.add(new StorageLocation(localStorageFolder2, 10000000000L, null));
storageLocations.add(new StorageLocation(localStorageFolder3, 10000000000L, null));
StorageLocationSelectorStrategy roundRobinStrategy = new RoundRobinStorageLocationSelectorStrategy(storageLocations);
iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy);
iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy);
iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy);
iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy);
iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy);
}
private void iterateLocs(File localStorageFolder1, File localStorageFolder2, File localStorageFolder3,
StorageLocationSelectorStrategy roundRobinStrategy)
{
Iterator<StorageLocation> locations = roundRobinStrategy.getLocations();
StorageLocation loc1 = locations.next();
Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1",
localStorageFolder1, loc1.getPath());
StorageLocation loc2 = locations.next();
Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_2",
localStorageFolder2, loc2.getPath());
StorageLocation loc3 = locations.next();
Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_3",
localStorageFolder3, loc3.getPath());
}
}

View File

@ -1549,6 +1549,7 @@ java.class.path
java.io.tmpdir
javaOpts
javaOptsArray
leastBytesUsed
loadList
loadqueuepeon
loadspec
@ -1572,6 +1573,7 @@ queryType
remoteTaskRunnerConfig
rendezvousHash
resultsets
roundRobin
runtime.properties
runtime.properties.
s3