From 187cf0dd3f6079077c07a6c8ef10621c26383d17 Mon Sep 17 00:00:00 2001 From: Fangyuan Deng <982092332@qq.com> Date: Wed, 4 Dec 2019 01:47:01 +0800 Subject: [PATCH] [Improvement] historical fast restart by lazy load columns metadata(20X faster) (#6988) * historical fast restart by lazy load columns metadata * delete repeated code * add documentation for druid.segmentCache.lazyLoadOnStart * fix unit test fail * fix spellcheck * update docs * update docs mentioning a catch --- docs/configuration/index.md | 1 + .../common/task/CompactionTaskTest.java | 17 ++- ...bstractMultiPhaseParallelIndexingTest.java | 2 +- .../org/apache/druid/segment/IndexIO.java | 131 ++++++++++++------ .../druid/segment/SimpleQueryableIndex.java | 55 +++++--- .../MMappedQueryableSegmentizerFactory.java | 4 +- .../segment/loading/SegmentizerFactory.java | 2 +- .../druid/segment/loading/SegmentLoader.java | 2 +- .../segment/loading/SegmentLoaderConfig.java | 8 ++ .../SegmentLoaderLocalCacheManager.java | 4 +- .../apache/druid/server/SegmentManager.java | 10 +- .../coordination/SegmentLoadDropHandler.java | 8 +- .../loading/CacheTestSegmentLoader.java | 2 +- .../druid/server/SegmentManagerTest.java | 28 ++-- .../SegmentManagerThreadSafetyTest.java | 6 +- .../coordination/ServerManagerTest.java | 5 +- 16 files changed, 179 insertions(+), 106 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 92fbe60fd80..ff20d69ae48 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1359,6 +1359,7 @@ These Historical configurations can be defined in the `historical/runtime.proper |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| |`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores| |`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`| +|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead. One catch is that if historical crashes while in the process of downloading and creating segment files, it is possible to end up with a corrupted segment on disk, this requires manual intervention to delete corrupted files. When the flag is set to true, historical startup would complete successfully and queries using this segment would fail at runtime.|false| |`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2| In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c76bc9c0fbf..45faff2cc40 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -1210,20 +1211,23 @@ public class CompactionTaskTest columnNames.add(ColumnHolder.TIME_COLUMN_NAME); columnNames.addAll(segment.getDimensions()); columnNames.addAll(segment.getMetrics()); - final Map columnMap = new HashMap<>(columnNames.size()); + final Map> columnMap = new HashMap<>(columnNames.size()); final List aggregatorFactories = new ArrayList<>(segment.getMetrics().size()); for (String columnName : columnNames) { if (MIXED_TYPE_COLUMN.equals(columnName)) { - columnMap.put(columnName, createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval()))); + ColumnHolder columnHolder = createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval())); + columnMap.put(columnName, () -> columnHolder); } else if (DIMENSIONS.containsKey(columnName)) { - columnMap.put(columnName, createColumn(DIMENSIONS.get(columnName))); + ColumnHolder columnHolder = createColumn(DIMENSIONS.get(columnName)); + columnMap.put(columnName, () -> columnHolder); } else { final Optional maybeMetric = AGGREGATORS.stream() .filter(agg -> agg.getName().equals(columnName)) .findAny(); if (maybeMetric.isPresent()) { - columnMap.put(columnName, createColumn(maybeMetric.get())); + ColumnHolder columnHolder = createColumn(maybeMetric.get()); + columnMap.put(columnName, () -> columnHolder); aggregatorFactories.add(maybeMetric.get()); } } @@ -1245,7 +1249,8 @@ public class CompactionTaskTest null, columnMap, null, - metadata + metadata, + false ) ); } @@ -1271,7 +1276,7 @@ public class CompactionTaskTest index.getColumns(), index.getFileMapper(), null, - index.getDimensionHandlers() + () -> index.getDimensionHandlers() ) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index c45c5ce0921..880d8065e17 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -275,7 +275,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()) .manufacturate(tempSegmentDir); try { - return loader.getSegment(dataSegment); + return loader.getSegment(dataSegment, false); } catch (SegmentLoadingException e) { throw new RuntimeException(e); diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index f577fe6fd61..65580727573 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -24,7 +24,9 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -179,13 +181,17 @@ public class IndexIO } public QueryableIndex loadIndex(File inDir) throws IOException + { + return loadIndex(inDir, false); + } + public QueryableIndex loadIndex(File inDir, boolean lazy) throws IOException { final int version = SegmentUtils.getVersionFromDir(inDir); final IndexLoader loader = indexLoaders.get(version); if (loader != null) { - return loader.load(inDir, mapper); + return loader.load(inDir, mapper, lazy); } else { throw new ISE("Unknown index version[%s]", version); } @@ -406,7 +412,7 @@ public class IndexIO interface IndexLoader { - QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException; + QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException; } static class LegacyIndexLoader implements IndexLoader @@ -421,11 +427,11 @@ public class IndexIO } @Override - public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException + public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException { MMappedIndex index = legacyHandler.mapDir(inDir); - Map columns = new HashMap<>(); + Map> columns = new HashMap<>(); for (String dimension : index.getAvailableDimensions()) { ColumnBuilder builder = new ColumnBuilder() @@ -449,61 +455,61 @@ public class IndexIO if (index.getSpatialIndexes().get(dimension) != null) { builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(index.getSpatialIndexes().get(dimension))); } - columns.put( - dimension, - builder.build() - ); + columns.put(dimension, getColumnHolderSupplier(builder, lazy)); } for (String metric : index.getAvailableMetrics()) { final MetricHolder metricHolder = index.getMetricHolder(metric); if (metricHolder.getType() == MetricHolder.MetricType.FLOAT) { - columns.put( - metric, - new ColumnBuilder() - .setType(ValueType.FLOAT) - .setNumericColumnSupplier( - new FloatNumericColumnSupplier( - metricHolder.floatType, - LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() - ) + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.FLOAT) + .setNumericColumnSupplier( + new FloatNumericColumnSupplier( + metricHolder.floatType, + LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() ) - .build() - ); + ); + columns.put(metric, getColumnHolderSupplier(builder, lazy)); } else if (metricHolder.getType() == MetricHolder.MetricType.COMPLEX) { - columns.put( - metric, - new ColumnBuilder() - .setType(ValueType.COMPLEX) - .setComplexColumnSupplier( - new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType) - ) - .build() - ); + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.COMPLEX) + .setComplexColumnSupplier( + new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType) + ); + columns.put(metric, getColumnHolderSupplier(builder, lazy)); } } - columns.put( - ColumnHolder.TIME_COLUMN_NAME, - new ColumnBuilder() - .setType(ValueType.LONG) - .setNumericColumnSupplier( - new LongNumericColumnSupplier( - index.timestamps, - LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() - ) + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.LONG) + .setNumericColumnSupplier( + new LongNumericColumnSupplier( + index.timestamps, + LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() ) - .build() - ); + ); + columns.put(ColumnHolder.TIME_COLUMN_NAME, getColumnHolderSupplier(builder, lazy)); + return new SimpleQueryableIndex( index.getDataInterval(), index.getAvailableDimensions(), new ConciseBitmapFactory(), columns, index.getFileMapper(), - null + null, + lazy ); } + + private Supplier getColumnHolderSupplier(ColumnBuilder builder, boolean lazy) + { + if (lazy) { + return Suppliers.memoize(() -> builder.build()); + } else { + ColumnHolder columnHolder = builder.build(); + return () -> columnHolder; + } + } } static class V9IndexLoader implements IndexLoader @@ -516,7 +522,7 @@ public class IndexIO } @Override - public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException + public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException { log.debug("Mapping v9 index[%s]", inDir); long startTime = System.currentTimeMillis(); @@ -576,17 +582,51 @@ public class IndexIO } } - Map columns = new HashMap<>(); + Map> columns = new HashMap<>(); for (String columnName : cols) { if (Strings.isNullOrEmpty(columnName)) { log.warn("Null or Empty Dimension found in the file : " + inDir); continue; } - columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName), smooshedFiles)); + + ByteBuffer colBuffer = smooshedFiles.mapFile(columnName); + + if (lazy) { + columns.put(columnName, Suppliers.memoize( + () -> { + try { + return deserializeColumn(mapper, colBuffer, smooshedFiles); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + )); + } else { + ColumnHolder columnHolder = deserializeColumn(mapper, colBuffer, smooshedFiles); + columns.put(columnName, () -> columnHolder); + } + } - columns.put(ColumnHolder.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"), smooshedFiles)); + ByteBuffer timeBuffer = smooshedFiles.mapFile("__time"); + + if (lazy) { + columns.put(ColumnHolder.TIME_COLUMN_NAME, Suppliers.memoize( + () -> { + try { + return deserializeColumn(mapper, timeBuffer, smooshedFiles); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + )); + } else { + ColumnHolder columnHolder = deserializeColumn(mapper, timeBuffer, smooshedFiles); + columns.put(ColumnHolder.TIME_COLUMN_NAME, () -> columnHolder); + } final QueryableIndex index = new SimpleQueryableIndex( dataInterval, @@ -594,7 +634,8 @@ public class IndexIO segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, - metadata + metadata, + lazy ); log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java index bd0d5fe5105..6cd70636d7e 100644 --- a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java @@ -21,6 +21,8 @@ package org.apache.druid.segment; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import org.apache.druid.collections.bitmap.BitmapFactory; @@ -42,19 +44,20 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde private final List columnNames; private final Indexed availableDimensions; private final BitmapFactory bitmapFactory; - private final Map columns; + private final Map> columns; private final SmooshedFileMapper fileMapper; @Nullable private final Metadata metadata; - private final Map dimensionHandlers; + private final Supplier> dimensionHandlers; public SimpleQueryableIndex( Interval dataInterval, Indexed dimNames, BitmapFactory bitmapFactory, - Map columns, + Map> columns, SmooshedFileMapper fileMapper, - @Nullable Metadata metadata + @Nullable Metadata metadata, + boolean lazy ) { Preconditions.checkNotNull(columns.get(ColumnHolder.TIME_COLUMN_NAME)); @@ -71,8 +74,27 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde this.columns = columns; this.fileMapper = fileMapper; this.metadata = metadata; - this.dimensionHandlers = Maps.newLinkedHashMap(); - initDimensionHandlers(); + + if (lazy) { + this.dimensionHandlers = Suppliers.memoize(() -> { + Map dimensionHandlerMap = Maps.newLinkedHashMap(); + for (String dim : availableDimensions) { + ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities(); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); + dimensionHandlerMap.put(dim, handler); + } + return dimensionHandlerMap; + } + ); + } else { + Map dimensionHandlerMap = Maps.newLinkedHashMap(); + for (String dim : availableDimensions) { + ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities(); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); + dimensionHandlerMap.put(dim, handler); + } + this.dimensionHandlers = () -> dimensionHandlerMap; + } } @VisibleForTesting @@ -81,10 +103,10 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde List columnNames, Indexed availableDimensions, BitmapFactory bitmapFactory, - Map columns, + Map> columns, SmooshedFileMapper fileMapper, @Nullable Metadata metadata, - Map dimensionHandlers + Supplier> dimensionHandlers ) { this.dataInterval = interval; @@ -106,7 +128,7 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde @Override public int getNumRows() { - return columns.get(ColumnHolder.TIME_COLUMN_NAME).getLength(); + return columns.get(ColumnHolder.TIME_COLUMN_NAME).get().getLength(); } @Override @@ -137,11 +159,12 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde @Override public ColumnHolder getColumnHolder(String columnName) { - return columns.get(columnName); + Supplier columnHolderSupplier = columns.get(columnName); + return columnHolderSupplier == null ? null : columnHolderSupplier.get(); } @VisibleForTesting - public Map getColumns() + public Map> getColumns() { return columns; } @@ -167,15 +190,7 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde @Override public Map getDimensionHandlers() { - return dimensionHandlers; + return dimensionHandlers.get(); } - private void initDimensionHandlers() - { - for (String dim : availableDimensions) { - ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities(); - DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); - dimensionHandlers.put(dim, handler); - } - } } diff --git a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java index d3cd9db8433..ea5290e204c 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java @@ -42,10 +42,10 @@ public class MMappedQueryableSegmentizerFactory implements SegmentizerFactory } @Override - public Segment factorize(DataSegment dataSegment, File parentDir) throws SegmentLoadingException + public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException { try { - return new QueryableIndexSegment(indexIO.loadIndex(parentDir), dataSegment.getId()); + return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId()); } catch (IOException e) { throw new SegmentLoadingException(e, "%s", e.getMessage()); diff --git a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java index 5bf17a6ea26..09bc048448b 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java @@ -31,5 +31,5 @@ import java.io.File; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MMappedQueryableSegmentizerFactory.class) public interface SegmentizerFactory { - Segment factorize(DataSegment segment, File parentDir) throws SegmentLoadingException; + Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException; } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java index 301e7370229..f63024a58cc 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java @@ -31,7 +31,7 @@ import java.io.File; public interface SegmentLoader { boolean isSegmentLoaded(DataSegment segment); - Segment getSegment(DataSegment segment) throws SegmentLoadingException; + Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException; File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; void cleanup(DataSegment segment); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 58eb77317e5..c6c57233738 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -37,6 +37,9 @@ public class SegmentLoaderConfig @NotEmpty private List locations = null; + @JsonProperty("lazyLoadOnStart") + private boolean lazyLoadOnStart = false; + @JsonProperty("deleteOnRemove") private boolean deleteOnRemove = true; @@ -66,6 +69,11 @@ public class SegmentLoaderConfig return locations; } + public boolean isLazyLoadOnStart() + { + return lazyLoadOnStart; + } + public boolean isDeleteOnRemove() { return deleteOnRemove; diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 8e77f4399e1..a62577ca178 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -121,7 +121,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader } @Override - public Segment getSegment(DataSegment segment) throws SegmentLoadingException + public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException { final ReferenceCountingLock lock = createOrGetLock(segment); final File segmentFiles; @@ -147,7 +147,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader factory = new MMappedQueryableSegmentizerFactory(indexIO); } - return factory.factorize(segment, segmentFiles); + return factory.factorize(segment, segmentFiles, lazy); } @Override diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 04c84aa51ab..dbb48474def 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -146,13 +146,15 @@ public class SegmentManager * * @param segment segment to load * + * @param lazy whether to lazy load columns metadata + * * @return true if the segment was newly loaded, false if it was already loaded * * @throws SegmentLoadingException if the segment cannot be loaded */ - public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException + public boolean loadSegment(final DataSegment segment, boolean lazy) throws SegmentLoadingException { - final Segment adapter = getAdapter(segment); + final Segment adapter = getAdapter(segment, lazy); final SettableSupplier resultSupplier = new SettableSupplier<>(); @@ -189,11 +191,11 @@ public class SegmentManager return resultSupplier.get(); } - private Segment getAdapter(final DataSegment segment) throws SegmentLoadingException + private Segment getAdapter(final DataSegment segment, boolean lazy) throws SegmentLoadingException { final Segment adapter; try { - adapter = segmentLoader.getSegment(segment); + adapter = segmentLoader.getSegment(segment, lazy); } catch (SegmentLoadingException e) { segmentLoader.cleanup(segment); diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index cb08182e37f..308c0d477d4 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -252,11 +252,11 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler * * @throws SegmentLoadingException if it fails to load the given segment */ - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException + private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy) throws SegmentLoadingException { final boolean loaded; try { - loaded = segmentManager.loadSegment(segment); + loaded = segmentManager.loadSegment(segment, lazy); } catch (Exception e) { removeSegment(segment, callback, false); @@ -304,7 +304,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler segmentsToDelete.remove(segment); } } - loadSegment(segment, DataSegmentChangeCallback.NOOP); + loadSegment(segment, DataSegmentChangeCallback.NOOP, false); // announce segment even if the segment file already exists. try { announcer.announceSegment(segment); @@ -351,7 +351,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler numSegments, segment.getId() ); - loadSegment(segment, callback); + loadSegment(segment, callback, config.isLazyLoadOnStart()); try { backgroundSegmentAnnouncer.announceSegment(segment); } diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index 0b291d40210..e604b29f763 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -47,7 +47,7 @@ public class CacheTestSegmentLoader implements SegmentLoader } @Override - public Segment getSegment(final DataSegment segment) + public Segment getSegment(final DataSegment segment, boolean lazy) { return new AbstractSegment() { diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 317e9c7f3ac..b6c9c1671fc 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -66,7 +66,7 @@ public class SegmentManagerTest } @Override - public Segment getSegment(final DataSegment segment) + public Segment getSegment(final DataSegment segment, boolean lazy) { return new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), @@ -219,7 +219,7 @@ public class SegmentManagerTest final List> futures = SEGMENTS.stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment) + () -> segmentManager.loadSegment(segment, false) ) ) .collect(Collectors.toList()); @@ -235,7 +235,7 @@ public class SegmentManagerTest public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException { for (DataSegment eachSegment : SEGMENTS) { - Assert.assertTrue(segmentManager.loadSegment(eachSegment)); + Assert.assertTrue(segmentManager.loadSegment(eachSegment, false)); } final List> futures = ImmutableList.of(SEGMENTS.get(0), SEGMENTS.get(2)).stream() @@ -261,14 +261,14 @@ public class SegmentManagerTest @Test public void testLoadDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException { - Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0))); - Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2))); + Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false)); + Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2), false)); final List> loadFutures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(3), SEGMENTS.get(4)) .stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment) + () -> segmentManager.loadSegment(segment, false) ) ) .collect(Collectors.toList()); @@ -299,10 +299,10 @@ public class SegmentManagerTest public void testLoadDuplicatedSegmentsSequentially() throws SegmentLoadingException { for (DataSegment segment : SEGMENTS) { - Assert.assertTrue(segmentManager.loadSegment(segment)); + Assert.assertTrue(segmentManager.loadSegment(segment, false)); } // try to load an existing segment - Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0))); + Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0), false)); assertResult(SEGMENTS); } @@ -315,7 +315,7 @@ public class SegmentManagerTest .stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment) + () -> segmentManager.loadSegment(segment, false) ) ) .collect(Collectors.toList()); @@ -336,7 +336,7 @@ public class SegmentManagerTest @Test public void testNonExistingSegmentsSequentially() throws SegmentLoadingException { - Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0))); + Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false)); // try to drop a non-existing segment of different data source segmentManager.dropSegment(SEGMENTS.get(2)); @@ -349,7 +349,7 @@ public class SegmentManagerTest public void testNonExistingSegmentsInParallel() throws SegmentLoadingException, ExecutionException, InterruptedException { - segmentManager.loadSegment(SEGMENTS.get(0)); + segmentManager.loadSegment(SEGMENTS.get(0), false); final List> futures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(2)) .stream() .map( @@ -372,7 +372,7 @@ public class SegmentManagerTest @Test public void testRemoveEmptyTimeline() throws SegmentLoadingException { - segmentManager.loadSegment(SEGMENTS.get(0)); + segmentManager.loadSegment(SEGMENTS.get(0), false); assertResult(ImmutableList.of(SEGMENTS.get(0))); Assert.assertEquals(1, segmentManager.getDataSources().size()); segmentManager.dropSegment(SEGMENTS.get(0)); @@ -406,7 +406,7 @@ public class SegmentManagerTest 10 ); - segmentManager.loadSegment(segment); + segmentManager.loadSegment(segment, false); assertResult(ImmutableList.of(segment)); segmentManager.dropSegment(segment); @@ -434,7 +434,7 @@ public class SegmentManagerTest segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk( - ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment), segment.getShardSpec()) + ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment, false), segment.getShardSpec()) ) ); } diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java index ce74902d9d8..2e5c2e3d99a 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java @@ -129,7 +129,7 @@ public class SegmentManagerThreadSafetyTest final DataSegment segment = createSegment("2019-01-01/2019-01-02"); final List futures = IntStream .range(0, 16) - .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment))) + .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment, false))) .collect(Collectors.toList()); for (Future future : futures) { future.get(); @@ -154,7 +154,7 @@ public class SegmentManagerThreadSafetyTest .mapToObj(i -> exec.submit(() -> { for (DataSegment segment : segments) { try { - segmentManager.loadSegment(segment); + segmentManager.loadSegment(segment, false); } catch (SegmentLoadingException e) { throw new RuntimeException(e); @@ -222,7 +222,7 @@ public class SegmentManagerThreadSafetyTest private static class TestSegmentizerFactory implements SegmentizerFactory { @Override - public Segment factorize(DataSegment segment, File parentDir) + public Segment factorize(DataSegment segment, File parentDir, boolean lazy) { return new Segment() { diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 6be9055be70..24828e2bbaa 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -121,7 +121,7 @@ public class ServerManagerTest } @Override - public Segment getSegment(final DataSegment segment) + public Segment getSegment(final DataSegment segment, boolean lazy) { return new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), @@ -478,7 +478,8 @@ public class ServerManagerTest NoneShardSpec.instance(), IndexIO.CURRENT_VERSION_ID, 123L - ) + ), + false ); } catch (SegmentLoadingException e) {