mirror of https://github.com/apache/druid.git
* Optionally load segment index files into page cache on bootstrap and new segment download * Fix unit test failure * Fix test case * fix spelling * fix spelling * fix test and test coverage issues Co-authored-by: Jian Wang <wjhypo@gmail.com>
This commit is contained in:
parent
665c926824
commit
2c79d28bb7
|
@ -1596,6 +1596,8 @@ These Historical configurations can be defined in the `historical/runtime.proper
|
||||||
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`|
|
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`|
|
||||||
|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead.|false|
|
|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead.|false|
|
||||||
|`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2|
|
|`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2|
|
||||||
|
|`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnDownload`|Number of threads to asynchronously read segment index files into null output stream on each new segment download after the historical process finishes bootstrapping. Recommended to set to 1 or 2 or leave unspecified to disable. See also `druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnBootstrap`|0|
|
||||||
|
|`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnBootstrap`|Number of threads to asynchronously read segment index files into null output stream during historical process bootstrap. This thread pool is terminated after historical process finishes bootstrapping. Recommended to set to half of available cores. If left unspecified, `druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnDownload` will be used. If both configs are unspecified, this feature is disabled. Preemptively loading segments into page cache helps in the sense that later when a segment is queried, it's already in page cache and only a minor page fault needs to be triggered instead of a more costly major page fault to make the query latency more consistent. Note that loading segment into page cache just does a blind loading of segment index files and will evict any existing segments from page cache at the discretion of operating system when the total segment size on local disk is larger than the page cache usable in the RAM, which roughly equals to total available RAM in the host - druid process memory including both heap and direct memory allocated - memory used by other non druid processes on the host, so it is the user's responsibility to ensure the host has enough RAM to host all the segments to avoid random evictions to fully leverage this feature.|`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnDownload`|
|
||||||
|
|
||||||
In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.
|
In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,7 @@ import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
public class DruidSegmentReaderTest extends NullHandlingTest
|
public class DruidSegmentReaderTest extends NullHandlingTest
|
||||||
{
|
{
|
||||||
|
@ -614,6 +615,12 @@ public class DruidSegmentReaderTest extends NullHandlingTest
|
||||||
{
|
{
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
},
|
},
|
||||||
DataSegment.builder()
|
DataSegment.builder()
|
||||||
.dataSource("ds")
|
.dataSource("ds")
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class to fetch segment files to local disk and manage the local cache.
|
* A class to fetch segment files to local disk and manage the local cache.
|
||||||
|
@ -84,4 +85,15 @@ public interface SegmentCacheManager
|
||||||
* explicitly reserved via {@link #reserve(DataSegment)}
|
* explicitly reserved via {@link #reserve(DataSegment)}
|
||||||
*/
|
*/
|
||||||
void cleanup(DataSegment segment);
|
void cleanup(DataSegment segment);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asyncly load segment into page cache.
|
||||||
|
* Equivalent to `cat segment_files > /dev/null` to force loading the segment index files into page cache so that
|
||||||
|
* later when the segment is queried, they are already in page cache and only a minor page fault needs to be triggered
|
||||||
|
* instead of a major page fault to make the query latency more consistent.
|
||||||
|
*
|
||||||
|
* @param segment The segment to load its index files into page cache
|
||||||
|
* @param exec The thread pool to use
|
||||||
|
*/
|
||||||
|
void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.druid.segment.ReferenceCountingSegment;
|
||||||
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
|
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Loading segments from deep storage to local storage. Internally, this class can delegate the download to
|
* Loading segments from deep storage to local storage. Internally, this class can delegate the download to
|
||||||
* {@link SegmentCacheManager}. Implementations must be thread-safe.
|
* {@link SegmentCacheManager}. Implementations must be thread-safe.
|
||||||
|
@ -52,4 +54,15 @@ public interface SegmentLoader
|
||||||
* cleanup any state used by this segment
|
* cleanup any state used by this segment
|
||||||
*/
|
*/
|
||||||
void cleanup(DataSegment segment);
|
void cleanup(DataSegment segment);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asyncly load segment into page cache.
|
||||||
|
* Equivalent to `cat segment_files > /dev/null` to force loading the segment index files into page cache so that
|
||||||
|
* later when the segment is queried, they are already in page cache and only a minor page fault needs to be triggered
|
||||||
|
* instead of a major page fault to make the query latency more consistent.
|
||||||
|
*
|
||||||
|
* @param segment The segment to load its index files into page cache
|
||||||
|
* @param exec The thread pool to use
|
||||||
|
*/
|
||||||
|
void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec);
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,12 @@ public class SegmentLoaderConfig
|
||||||
@JsonProperty("numBootstrapThreads")
|
@JsonProperty("numBootstrapThreads")
|
||||||
private Integer numBootstrapThreads = null;
|
private Integer numBootstrapThreads = null;
|
||||||
|
|
||||||
|
@JsonProperty("numThreadsToLoadSegmentsIntoPageCacheOnDownload")
|
||||||
|
private int numThreadsToLoadSegmentsIntoPageCacheOnDownload = 0;
|
||||||
|
|
||||||
|
@JsonProperty("numThreadsToLoadSegmentsIntoPageCacheOnBootstrap")
|
||||||
|
private Integer numThreadsToLoadSegmentsIntoPageCacheOnBootstrap = null;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private File infoDir = null;
|
private File infoDir = null;
|
||||||
|
|
||||||
|
@ -99,6 +105,18 @@ public class SegmentLoaderConfig
|
||||||
return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
|
return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getNumThreadsToLoadSegmentsIntoPageCacheOnDownload()
|
||||||
|
{
|
||||||
|
return numThreadsToLoadSegmentsIntoPageCacheOnDownload;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap()
|
||||||
|
{
|
||||||
|
return numThreadsToLoadSegmentsIntoPageCacheOnBootstrap == null ?
|
||||||
|
numThreadsToLoadSegmentsIntoPageCacheOnDownload :
|
||||||
|
numThreadsToLoadSegmentsIntoPageCacheOnBootstrap;
|
||||||
|
}
|
||||||
|
|
||||||
public File getInfoDir()
|
public File getInfoDir()
|
||||||
{
|
{
|
||||||
if (infoDir == null) {
|
if (infoDir == null) {
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.druid.timeline.DataSegment;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
public class SegmentLocalCacheLoader implements SegmentLoader
|
public class SegmentLocalCacheLoader implements SegmentLoader
|
||||||
{
|
{
|
||||||
|
@ -78,5 +79,9 @@ public class SegmentLocalCacheLoader implements SegmentLoader
|
||||||
cacheManager.cleanup(segment);
|
cacheManager.cleanup(segment);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
|
||||||
|
{
|
||||||
|
cacheManager.loadSegmentIntoPageCache(segment, exec);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,12 @@ package org.apache.druid.segment.loading;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.io.output.NullOutputStream;
|
||||||
import org.apache.druid.guice.annotations.Json;
|
import org.apache.druid.guice.annotations.Json;
|
||||||
import org.apache.druid.java.util.common.FileUtils;
|
import org.apache.druid.java.util.common.FileUtils;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
|
|
||||||
|
@ -32,10 +35,14 @@ import javax.annotation.Nonnull;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -79,6 +86,8 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
|
||||||
|
|
||||||
private final StorageLocationSelectorStrategy strategy;
|
private final StorageLocationSelectorStrategy strategy;
|
||||||
|
|
||||||
|
private ExecutorService loadSegmentsIntoPageCacheOnDownloadExec = null;
|
||||||
|
|
||||||
// Note that we only create this via injection in historical and realtime nodes. Peons create these
|
// Note that we only create this via injection in historical and realtime nodes. Peons create these
|
||||||
// objects via SegmentCacheManagerFactory objects, so that they can store segments in task-specific
|
// objects via SegmentCacheManagerFactory objects, so that they can store segments in task-specific
|
||||||
// directories rather than statically configured directories.
|
// directories rather than statically configured directories.
|
||||||
|
@ -95,6 +104,14 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
|
||||||
this.locations = locations;
|
this.locations = locations;
|
||||||
this.strategy = strategy;
|
this.strategy = strategy;
|
||||||
log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName());
|
log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName());
|
||||||
|
|
||||||
|
if (this.config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload() != 0) {
|
||||||
|
loadSegmentsIntoPageCacheOnDownloadExec = Executors.newFixedThreadPool(
|
||||||
|
config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload(),
|
||||||
|
Execs.makeThreadFactory("LoadSegmentsIntoPageCacheOnDownload-%s"));
|
||||||
|
log.info("Size of thread pool to load segments into page cache on download [%d]",
|
||||||
|
config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -436,6 +453,58 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
|
||||||
|
{
|
||||||
|
ExecutorService execToUse = exec != null ? exec : loadSegmentsIntoPageCacheOnDownloadExec;
|
||||||
|
if (execToUse == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
execToUse.submit(
|
||||||
|
() -> {
|
||||||
|
final ReferenceCountingLock lock = createOrGetLock(segment);
|
||||||
|
synchronized (lock) {
|
||||||
|
try {
|
||||||
|
for (StorageLocation location : locations) {
|
||||||
|
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
|
||||||
|
if (localStorageDir.exists()) {
|
||||||
|
File baseFile = location.getPath();
|
||||||
|
if (localStorageDir.equals(baseFile)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Loading directory[%s] into page cache", localStorageDir);
|
||||||
|
|
||||||
|
File[] children = localStorageDir.listFiles();
|
||||||
|
if (children != null) {
|
||||||
|
for (File child : children) {
|
||||||
|
InputStream in = null;
|
||||||
|
try {
|
||||||
|
in = new FileInputStream(child);
|
||||||
|
IOUtils.copy(in, new NullOutputStream());
|
||||||
|
|
||||||
|
log.info("Loaded [%s] into page cache", child.getAbsolutePath());
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Failed to load [%s] into page cache, [%s]", child.getAbsolutePath(), e.getMessage());
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
IOUtils.closeQuietly(in);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
unlock(segment, lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private void cleanupCacheFiles(File baseFile, File cacheFile)
|
private void cleanupCacheFiles(File baseFile, File cacheFile)
|
||||||
{
|
{
|
||||||
if (cacheFile.equals(baseFile)) {
|
if (cacheFile.equals(baseFile)) {
|
||||||
|
|
|
@ -48,6 +48,7 @@ import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import java.util.stream.StreamSupport;
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
|
@ -203,18 +204,29 @@ public class SegmentManager
|
||||||
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
|
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
|
||||||
|
throws SegmentLoadingException
|
||||||
|
{
|
||||||
|
return loadSegment(segment, lazy, loadFailed, null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load a single segment.
|
* Load a single segment.
|
||||||
*
|
*
|
||||||
* @param segment segment to load
|
* @param segment segment to load
|
||||||
* @param lazy whether to lazy load columns metadata
|
* @param lazy whether to lazy load columns metadata
|
||||||
* @param loadFailed callBack to execute when segment lazy load failed
|
* @param loadFailed callBack to execute when segment lazy load failed
|
||||||
|
* @param loadSegmentIntoPageCacheExec If null is specified, the default thread pool in segment loader to load
|
||||||
|
* segments into page cache on download will be used. You can specify a dedicated
|
||||||
|
* thread pool of larger capacity when this function is called during historical
|
||||||
|
* process bootstrap to speed up initial loading.
|
||||||
*
|
*
|
||||||
* @return true if the segment was newly loaded, false if it was already loaded
|
* @return true if the segment was newly loaded, false if it was already loaded
|
||||||
*
|
*
|
||||||
* @throws SegmentLoadingException if the segment cannot be loaded
|
* @throws SegmentLoadingException if the segment cannot be loaded
|
||||||
*/
|
*/
|
||||||
public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
|
public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed,
|
||||||
|
ExecutorService loadSegmentIntoPageCacheExec) throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
final ReferenceCountingSegment adapter = getSegmentReference(segment, lazy, loadFailed);
|
final ReferenceCountingSegment adapter = getSegmentReference(segment, lazy, loadFailed);
|
||||||
|
|
||||||
|
@ -254,6 +266,8 @@ public class SegmentManager
|
||||||
segment.getShardSpec().createChunk(adapter)
|
segment.getShardSpec().createChunk(adapter)
|
||||||
);
|
);
|
||||||
dataSourceState.addSegment(segment);
|
dataSourceState.addSegment(segment);
|
||||||
|
// Asyncly load segment index files into page cache in a thread pool
|
||||||
|
segmentLoader.loadSegmentIntoPageCache(segment, loadSegmentIntoPageCacheExec);
|
||||||
resultSupplier.set(true);
|
resultSupplier.set(true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -259,20 +259,28 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy)
|
||||||
|
throws SegmentLoadingException
|
||||||
|
{
|
||||||
|
loadSegment(segment, callback, lazy, null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will
|
* Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will
|
||||||
* throw a SegmentLoadingException
|
* throw a SegmentLoadingException
|
||||||
*
|
*
|
||||||
* @throws SegmentLoadingException if it fails to load the given segment
|
* @throws SegmentLoadingException if it fails to load the given segment
|
||||||
*/
|
*/
|
||||||
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy)
|
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy, @Nullable
|
||||||
|
ExecutorService loadSegmentIntoPageCacheExec)
|
||||||
throws SegmentLoadingException
|
throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
final boolean loaded;
|
final boolean loaded;
|
||||||
try {
|
try {
|
||||||
loaded = segmentManager.loadSegment(segment,
|
loaded = segmentManager.loadSegment(segment,
|
||||||
lazy,
|
lazy,
|
||||||
() -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, false)
|
() -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, false),
|
||||||
|
loadSegmentIntoPageCacheExec
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -346,9 +354,19 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bulk adding segments during bootstrap
|
||||||
|
* @param segments A collection of segments to add
|
||||||
|
* @param callback Segment loading callback
|
||||||
|
*/
|
||||||
private void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
|
private void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
|
||||||
{
|
{
|
||||||
|
// Start a temporary thread pool to load segments into page cache during bootstrap
|
||||||
ExecutorService loadingExecutor = null;
|
ExecutorService loadingExecutor = null;
|
||||||
|
ExecutorService loadSegmentsIntoPageCacheOnBootstrapExec =
|
||||||
|
config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() != 0 ?
|
||||||
|
Execs.multiThreaded(config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap(),
|
||||||
|
"Load-Segments-Into-Page-Cache-On-Bootstrap-%s") : null;
|
||||||
try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
|
try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
|
||||||
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
|
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
|
||||||
|
|
||||||
|
@ -370,7 +388,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
||||||
numSegments,
|
numSegments,
|
||||||
segment.getId()
|
segment.getId()
|
||||||
);
|
);
|
||||||
loadSegment(segment, callback, config.isLazyLoadOnStart());
|
loadSegment(segment, callback, config.isLazyLoadOnStart(), loadSegmentsIntoPageCacheOnBootstrapExec);
|
||||||
try {
|
try {
|
||||||
backgroundSegmentAnnouncer.announceSegment(segment);
|
backgroundSegmentAnnouncer.announceSegment(segment);
|
||||||
}
|
}
|
||||||
|
@ -416,6 +434,11 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
||||||
if (loadingExecutor != null) {
|
if (loadingExecutor != null) {
|
||||||
loadingExecutor.shutdownNow();
|
loadingExecutor.shutdownNow();
|
||||||
}
|
}
|
||||||
|
if (loadSegmentsIntoPageCacheOnBootstrapExec != null) {
|
||||||
|
// At this stage, all tasks have been submitted, send a shutdown command to the bootstrap
|
||||||
|
// thread pool so threads will exit after finishing the tasks
|
||||||
|
loadSegmentsIntoPageCacheOnBootstrapExec.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.io.File;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -69,4 +70,10 @@ public class CacheTestSegmentCacheManager implements SegmentCacheManager
|
||||||
{
|
{
|
||||||
return segmentsInTrash;
|
return segmentsInTrash;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.SegmentId;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class CacheTestSegmentLoader implements SegmentLoader
|
public class CacheTestSegmentLoader implements SegmentLoader
|
||||||
|
@ -70,6 +72,12 @@ public class CacheTestSegmentLoader implements SegmentLoader
|
||||||
return ReferenceCountingSegment.wrapSegment(baseSegment, segment.getShardSpec());
|
return ReferenceCountingSegment.wrapSegment(baseSegment, segment.getShardSpec());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanup(DataSegment segment)
|
public void cleanup(DataSegment segment)
|
||||||
{
|
{
|
||||||
|
|
|
@ -44,6 +44,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
public class SegmentLocalCacheManagerTest
|
public class SegmentLocalCacheManagerTest
|
||||||
{
|
{
|
||||||
|
@ -100,6 +101,31 @@ public class SegmentLocalCacheManagerTest
|
||||||
Assert.assertFalse("Expect cache miss", manager.isSegmentCached(uncachedSegment));
|
Assert.assertFalse("Expect cache miss", manager.isSegmentCached(uncachedSegment));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoLoadingOfSegmentInPageCache() throws IOException
|
||||||
|
{
|
||||||
|
final DataSegment segment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D");
|
||||||
|
final File segmentFile = new File(
|
||||||
|
localSegmentCacheFolder,
|
||||||
|
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
|
||||||
|
);
|
||||||
|
FileUtils.mkdirp(segmentFile);
|
||||||
|
// should not throw any exception
|
||||||
|
manager.loadSegmentIntoPageCache(segment, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadSegmentInPageCache() throws IOException
|
||||||
|
{
|
||||||
|
final DataSegment segment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D");
|
||||||
|
final File segmentFile = new File(
|
||||||
|
localSegmentCacheFolder,
|
||||||
|
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
|
||||||
|
);
|
||||||
|
FileUtils.mkdirp(segmentFile);
|
||||||
|
// should not throw any exception
|
||||||
|
manager.loadSegmentIntoPageCache(segment, Executors.newSingleThreadExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIfTombstoneIsLoaded() throws IOException, SegmentLoadingException
|
public void testIfTombstoneIsLoaded() throws IOException, SegmentLoadingException
|
||||||
|
|
|
@ -77,6 +77,12 @@ public class SegmentManagerTest
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private static class SegmentForTesting implements Segment
|
private static class SegmentForTesting implements Segment
|
||||||
|
|
|
@ -519,7 +519,8 @@ public class SegmentLoadDropHandlerTest
|
||||||
public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception
|
public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception
|
||||||
{
|
{
|
||||||
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
|
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
|
||||||
Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any()))
|
Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(),
|
||||||
|
ArgumentMatchers.any(), ArgumentMatchers.any()))
|
||||||
.thenThrow(new RuntimeException("segment loading failure test"))
|
.thenThrow(new RuntimeException("segment loading failure test"))
|
||||||
.thenReturn(true);
|
.thenReturn(true);
|
||||||
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
|
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
|
||||||
|
@ -562,7 +563,7 @@ public class SegmentLoadDropHandlerTest
|
||||||
public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception
|
public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception
|
||||||
{
|
{
|
||||||
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
|
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
|
||||||
Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any()))
|
Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()))
|
||||||
.thenReturn(true);
|
.thenReturn(true);
|
||||||
Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
|
Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
|
||||||
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
|
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
|
||||||
|
@ -603,7 +604,7 @@ public class SegmentLoadDropHandlerTest
|
||||||
scheduledRunnable.clear();
|
scheduledRunnable.clear();
|
||||||
|
|
||||||
// check invocations after a load-drop sequence
|
// check invocations after a load-drop sequence
|
||||||
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
|
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
|
||||||
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
|
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
|
||||||
|
|
||||||
// try to reload the segment - this should be a no-op since it might be the case that this is the first load client
|
// try to reload the segment - this should be a no-op since it might be the case that this is the first load client
|
||||||
|
@ -615,7 +616,7 @@ public class SegmentLoadDropHandlerTest
|
||||||
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
|
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
|
||||||
|
|
||||||
// check invocations - should stay the same
|
// check invocations - should stay the same
|
||||||
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
|
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
|
||||||
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
|
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
|
||||||
|
|
||||||
// try to reload the segment - this time the loader will know that is a fresh request to load
|
// try to reload the segment - this time the loader will know that is a fresh request to load
|
||||||
|
@ -630,7 +631,7 @@ public class SegmentLoadDropHandlerTest
|
||||||
scheduledRunnable.clear();
|
scheduledRunnable.clear();
|
||||||
|
|
||||||
// check invocations - the load segment counter should bump up
|
// check invocations - the load segment counter should bump up
|
||||||
Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
|
Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
|
||||||
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
|
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
|
||||||
|
|
||||||
segmentLoadDropHandler.stop();
|
segmentLoadDropHandler.stop();
|
||||||
|
|
|
@ -168,6 +168,12 @@ public class ServerManagerTest
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
serverManager = new ServerManager(
|
serverManager = new ServerManager(
|
||||||
|
|
Loading…
Reference in New Issue