RandomLocationSelectorStrategy to Choose an available disk(location) to store a segment. With unit tests. (#8461)

This commit is contained in:
SeKing 2019-11-22 19:46:54 +08:00 committed by Clint Wylie
parent 934547a215
commit 9955107e8e
5 changed files with 186 additions and 2 deletions

View File

@ -1352,7 +1352,7 @@ These Historical configurations can be defined in the `historical/runtime.proper
|Property|Description|Default|
|--------|-----------|-------|
|`druid.segmentCache.locations`|Segments assigned to a Historical process are first stored on the local file system (in a disk cache) and then served by the Historical process. These locations define where that local cache resides. This value cannot be NULL or EMPTY. Here is an example `druid.segmentCache.locations=[{"path": "/mnt/druidSegments", "maxSize": 10000, "freeSpacePercent": 1.0}]`. "freeSpacePercent" is optional, if provided then enforces that much of free disk partition space while storing segments. But, it depends on File.getTotalSpace() and File.getFreeSpace() methods, so enable if only if they work for your File System.| none |
|`druid.segmentCache.locationSelectorStrategy`|The strategy used to select a location from the configured `druid.segmentCache.locations` for segment distribution. Possible values are `leastBytesUsed` or `roundRobin`. |leastBytesUsed|
|`druid.segmentCache.locationSelectorStrategy`|The strategy used to select a location from the configured `druid.segmentCache.locations` for segment distribution. Possible values are `leastBytesUsed` or `roundRobin` or `random`. |leastBytesUsed|
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a process is no longer serving a segment.|true|
|`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)|
|`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/**
* A {@link StorageLocation} selector strategy that selects a segment cache location randomly each time
* among the available storage locations.
*/
public class RandomStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy
{
private final List<StorageLocation> storageLocations;
public RandomStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
{
this.storageLocations = storageLocations;
}
@Override
public Iterator<StorageLocation> getLocations()
{
List<StorageLocation> copyLocation = new ArrayList<>(storageLocations);
Collections.shuffle(copyLocation);
return copyLocation.iterator();
}
}

View File

@ -38,7 +38,8 @@ import java.util.Iterator;
LeastBytesUsedStorageLocationSelectorStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "leastBytesUsed", value = LeastBytesUsedStorageLocationSelectorStrategy.class),
@JsonSubTypes.Type(name = "roundRobin", value = RoundRobinStorageLocationSelectorStrategy.class)
@JsonSubTypes.Type(name = "roundRobin", value = RoundRobinStorageLocationSelectorStrategy.class),
@JsonSubTypes.Type(name = "random", value = RandomStorageLocationSelectorStrategy.class)
})
public interface StorageLocationSelectorStrategy
{

View File

@ -655,4 +655,109 @@ public class SegmentLoaderLocalCacheManagerTest
}
@Test
public void testSegmentDistributionUsingRandomStrategy() throws Exception
{
final List<StorageLocationConfig> locationConfigs = new ArrayList<>();
final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10L,
true);
final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 100L,
false);
final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 9L,
true);
locationConfigs.add(locationConfig);
locationConfigs.add(locationConfig2);
locationConfigs.add(locationConfig3);
List<StorageLocation> locations = new ArrayList<>();
for (StorageLocationConfig locConfig : locationConfigs) {
locations.add(
new StorageLocation(
locConfig.getPath(),
locConfig.getMaxSize(),
null
)
);
}
manager = new SegmentLoaderLocalCacheManager(
TestHelper.getTestIndexIO(),
new SegmentLoaderConfig().withLocations(locationConfigs).withStorageLocationSelectorStrategy(
new RandomStorageLocationSelectorStrategy(locations)
),
jsonMapper
);
final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
// Segment 1 should be downloaded in local_storage_folder, segment1 size 10L
final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D", 10L).withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder,
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
File segmentFile = manager.getSegmentFiles(segmentToDownload);
Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
// Segment 2 should be downloaded in local_storage_folder3, segment2 size 9L
final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D", 9L).withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder,
"test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2));
File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder3/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
// Segment 3 should not be downloaded, segment3 size 20L
final DataSegment segmentToDownload3 = dataSegmentWithInterval("2014-12-20T00:00:00Z/P1D", 20L).withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
createLocalSegmentFile(segmentSrcFolder,
"test_segment_loader/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
try {
// expect failure
manager.getSegmentFiles(segmentToDownload3);
Assert.fail();
}
catch (SegmentLoadingException e) {
}
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3));
}
}

View File

@ -26,6 +26,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@ -180,4 +181,32 @@ public class StorageLocationSelectorStrategyTest
localStorageFolder1, loc1.getPath());
}
@Test
public void testRandomLocationSelectorStrategy() throws Exception
{
List<StorageLocation> storageLocations = new ArrayList<>();
final File localStorageFolder1 = tmpFolder.newFolder("local_storage_folder_1");
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder_2");
final File localStorageFolder3 = tmpFolder.newFolder("local_storage_folder_3");
storageLocations.add(new StorageLocation(localStorageFolder1, 3000L, null));
storageLocations.add(new StorageLocation(localStorageFolder2, 2000L, null));
storageLocations.add(new StorageLocation(localStorageFolder3, 1000L, null));
StorageLocationSelectorStrategy leastBytesUsedStrategy =
new RandomStorageLocationSelectorStrategy(storageLocations);
Iterator<StorageLocation> locations = leastBytesUsedStrategy.getLocations();
StorageLocation loc1 = locations.next();
StorageLocation loc2 = locations.next();
StorageLocation loc3 = locations.next();
File[] result = new File[]{loc1.getPath(), loc2.getPath(), loc3.getPath()};
Arrays.sort(result);
Assert.assertArrayEquals(new File[]{localStorageFolder1, localStorageFolder2, localStorageFolder3}, result);
}
}