mirror of https://github.com/apache/druid.git
[Improvement] historical fast restart by lazy load columns metadata(20X faster) (#6988)
* historical fast restart by lazy load columns metadata * delete repeated code * add documentation for druid.segmentCache.lazyLoadOnStart * fix unit test fail * fix spellcheck * update docs * update docs mentioning a catch
This commit is contained in:
parent
b4efaa698b
commit
187cf0dd3f
|
@ -1359,6 +1359,7 @@ These Historical configurations can be defined in the `historical/runtime.proper
|
|||
|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
|
||||
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores|
|
||||
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`|
|
||||
|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead. One catch is that if historical crashes while in the process of downloading and creating segment files, it is possible to end up with a corrupted segment on disk, this requires manual intervention to delete corrupted files. When the flag is set to true, historical startup would complete successfully and queries using this segment would fail at runtime.|false|
|
||||
|`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2|
|
||||
|
||||
In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
|
|||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
@ -1210,20 +1211,23 @@ public class CompactionTaskTest
|
|||
columnNames.add(ColumnHolder.TIME_COLUMN_NAME);
|
||||
columnNames.addAll(segment.getDimensions());
|
||||
columnNames.addAll(segment.getMetrics());
|
||||
final Map<String, ColumnHolder> columnMap = new HashMap<>(columnNames.size());
|
||||
final Map<String, Supplier<ColumnHolder>> columnMap = new HashMap<>(columnNames.size());
|
||||
final List<AggregatorFactory> aggregatorFactories = new ArrayList<>(segment.getMetrics().size());
|
||||
|
||||
for (String columnName : columnNames) {
|
||||
if (MIXED_TYPE_COLUMN.equals(columnName)) {
|
||||
columnMap.put(columnName, createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval())));
|
||||
ColumnHolder columnHolder = createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval()));
|
||||
columnMap.put(columnName, () -> columnHolder);
|
||||
} else if (DIMENSIONS.containsKey(columnName)) {
|
||||
columnMap.put(columnName, createColumn(DIMENSIONS.get(columnName)));
|
||||
ColumnHolder columnHolder = createColumn(DIMENSIONS.get(columnName));
|
||||
columnMap.put(columnName, () -> columnHolder);
|
||||
} else {
|
||||
final Optional<AggregatorFactory> maybeMetric = AGGREGATORS.stream()
|
||||
.filter(agg -> agg.getName().equals(columnName))
|
||||
.findAny();
|
||||
if (maybeMetric.isPresent()) {
|
||||
columnMap.put(columnName, createColumn(maybeMetric.get()));
|
||||
ColumnHolder columnHolder = createColumn(maybeMetric.get());
|
||||
columnMap.put(columnName, () -> columnHolder);
|
||||
aggregatorFactories.add(maybeMetric.get());
|
||||
}
|
||||
}
|
||||
|
@ -1245,7 +1249,8 @@ public class CompactionTaskTest
|
|||
null,
|
||||
columnMap,
|
||||
null,
|
||||
metadata
|
||||
metadata,
|
||||
false
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -1271,7 +1276,7 @@ public class CompactionTaskTest
|
|||
index.getColumns(),
|
||||
index.getFileMapper(),
|
||||
null,
|
||||
index.getDimensionHandlers()
|
||||
() -> index.getDimensionHandlers()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -275,7 +275,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
|
|||
final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper())
|
||||
.manufacturate(tempSegmentDir);
|
||||
try {
|
||||
return loader.getSegment(dataSegment);
|
||||
return loader.getSegment(dataSegment, false);
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
|
|
@ -24,7 +24,9 @@ import com.fasterxml.jackson.databind.JsonMappingException;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
@ -179,13 +181,17 @@ public class IndexIO
|
|||
}
|
||||
|
||||
public QueryableIndex loadIndex(File inDir) throws IOException
|
||||
{
|
||||
return loadIndex(inDir, false);
|
||||
}
|
||||
public QueryableIndex loadIndex(File inDir, boolean lazy) throws IOException
|
||||
{
|
||||
final int version = SegmentUtils.getVersionFromDir(inDir);
|
||||
|
||||
final IndexLoader loader = indexLoaders.get(version);
|
||||
|
||||
if (loader != null) {
|
||||
return loader.load(inDir, mapper);
|
||||
return loader.load(inDir, mapper, lazy);
|
||||
} else {
|
||||
throw new ISE("Unknown index version[%s]", version);
|
||||
}
|
||||
|
@ -406,7 +412,7 @@ public class IndexIO
|
|||
|
||||
interface IndexLoader
|
||||
{
|
||||
QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException;
|
||||
QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException;
|
||||
}
|
||||
|
||||
static class LegacyIndexLoader implements IndexLoader
|
||||
|
@ -421,11 +427,11 @@ public class IndexIO
|
|||
}
|
||||
|
||||
@Override
|
||||
public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
|
||||
public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException
|
||||
{
|
||||
MMappedIndex index = legacyHandler.mapDir(inDir);
|
||||
|
||||
Map<String, ColumnHolder> columns = new HashMap<>();
|
||||
Map<String, Supplier<ColumnHolder>> columns = new HashMap<>();
|
||||
|
||||
for (String dimension : index.getAvailableDimensions()) {
|
||||
ColumnBuilder builder = new ColumnBuilder()
|
||||
|
@ -449,61 +455,61 @@ public class IndexIO
|
|||
if (index.getSpatialIndexes().get(dimension) != null) {
|
||||
builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(index.getSpatialIndexes().get(dimension)));
|
||||
}
|
||||
columns.put(
|
||||
dimension,
|
||||
builder.build()
|
||||
);
|
||||
columns.put(dimension, getColumnHolderSupplier(builder, lazy));
|
||||
}
|
||||
|
||||
for (String metric : index.getAvailableMetrics()) {
|
||||
final MetricHolder metricHolder = index.getMetricHolder(metric);
|
||||
if (metricHolder.getType() == MetricHolder.MetricType.FLOAT) {
|
||||
columns.put(
|
||||
metric,
|
||||
new ColumnBuilder()
|
||||
ColumnBuilder builder = new ColumnBuilder()
|
||||
.setType(ValueType.FLOAT)
|
||||
.setNumericColumnSupplier(
|
||||
new FloatNumericColumnSupplier(
|
||||
metricHolder.floatType,
|
||||
LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
|
||||
)
|
||||
)
|
||||
.build()
|
||||
);
|
||||
columns.put(metric, getColumnHolderSupplier(builder, lazy));
|
||||
} else if (metricHolder.getType() == MetricHolder.MetricType.COMPLEX) {
|
||||
columns.put(
|
||||
metric,
|
||||
new ColumnBuilder()
|
||||
ColumnBuilder builder = new ColumnBuilder()
|
||||
.setType(ValueType.COMPLEX)
|
||||
.setComplexColumnSupplier(
|
||||
new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType)
|
||||
)
|
||||
.build()
|
||||
);
|
||||
columns.put(metric, getColumnHolderSupplier(builder, lazy));
|
||||
}
|
||||
}
|
||||
|
||||
columns.put(
|
||||
ColumnHolder.TIME_COLUMN_NAME,
|
||||
new ColumnBuilder()
|
||||
ColumnBuilder builder = new ColumnBuilder()
|
||||
.setType(ValueType.LONG)
|
||||
.setNumericColumnSupplier(
|
||||
new LongNumericColumnSupplier(
|
||||
index.timestamps,
|
||||
LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
|
||||
)
|
||||
)
|
||||
.build()
|
||||
);
|
||||
columns.put(ColumnHolder.TIME_COLUMN_NAME, getColumnHolderSupplier(builder, lazy));
|
||||
|
||||
return new SimpleQueryableIndex(
|
||||
index.getDataInterval(),
|
||||
index.getAvailableDimensions(),
|
||||
new ConciseBitmapFactory(),
|
||||
columns,
|
||||
index.getFileMapper(),
|
||||
null
|
||||
null,
|
||||
lazy
|
||||
);
|
||||
}
|
||||
|
||||
private Supplier<ColumnHolder> getColumnHolderSupplier(ColumnBuilder builder, boolean lazy)
|
||||
{
|
||||
if (lazy) {
|
||||
return Suppliers.memoize(() -> builder.build());
|
||||
} else {
|
||||
ColumnHolder columnHolder = builder.build();
|
||||
return () -> columnHolder;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class V9IndexLoader implements IndexLoader
|
||||
|
@ -516,7 +522,7 @@ public class IndexIO
|
|||
}
|
||||
|
||||
@Override
|
||||
public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
|
||||
public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException
|
||||
{
|
||||
log.debug("Mapping v9 index[%s]", inDir);
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
@ -576,17 +582,51 @@ public class IndexIO
|
|||
}
|
||||
}
|
||||
|
||||
Map<String, ColumnHolder> columns = new HashMap<>();
|
||||
Map<String, Supplier<ColumnHolder>> columns = new HashMap<>();
|
||||
|
||||
for (String columnName : cols) {
|
||||
if (Strings.isNullOrEmpty(columnName)) {
|
||||
log.warn("Null or Empty Dimension found in the file : " + inDir);
|
||||
continue;
|
||||
}
|
||||
columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName), smooshedFiles));
|
||||
|
||||
ByteBuffer colBuffer = smooshedFiles.mapFile(columnName);
|
||||
|
||||
if (lazy) {
|
||||
columns.put(columnName, Suppliers.memoize(
|
||||
() -> {
|
||||
try {
|
||||
return deserializeColumn(mapper, colBuffer, smooshedFiles);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
));
|
||||
} else {
|
||||
ColumnHolder columnHolder = deserializeColumn(mapper, colBuffer, smooshedFiles);
|
||||
columns.put(columnName, () -> columnHolder);
|
||||
}
|
||||
|
||||
columns.put(ColumnHolder.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"), smooshedFiles));
|
||||
}
|
||||
|
||||
ByteBuffer timeBuffer = smooshedFiles.mapFile("__time");
|
||||
|
||||
if (lazy) {
|
||||
columns.put(ColumnHolder.TIME_COLUMN_NAME, Suppliers.memoize(
|
||||
() -> {
|
||||
try {
|
||||
return deserializeColumn(mapper, timeBuffer, smooshedFiles);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
));
|
||||
} else {
|
||||
ColumnHolder columnHolder = deserializeColumn(mapper, timeBuffer, smooshedFiles);
|
||||
columns.put(ColumnHolder.TIME_COLUMN_NAME, () -> columnHolder);
|
||||
}
|
||||
|
||||
final QueryableIndex index = new SimpleQueryableIndex(
|
||||
dataInterval,
|
||||
|
@ -594,7 +634,8 @@ public class IndexIO
|
|||
segmentBitmapSerdeFactory.getBitmapFactory(),
|
||||
columns,
|
||||
smooshedFiles,
|
||||
metadata
|
||||
metadata,
|
||||
lazy
|
||||
);
|
||||
|
||||
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.druid.segment;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.druid.collections.bitmap.BitmapFactory;
|
||||
|
@ -42,19 +44,20 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
|
|||
private final List<String> columnNames;
|
||||
private final Indexed<String> availableDimensions;
|
||||
private final BitmapFactory bitmapFactory;
|
||||
private final Map<String, ColumnHolder> columns;
|
||||
private final Map<String, Supplier<ColumnHolder>> columns;
|
||||
private final SmooshedFileMapper fileMapper;
|
||||
@Nullable
|
||||
private final Metadata metadata;
|
||||
private final Map<String, DimensionHandler> dimensionHandlers;
|
||||
private final Supplier<Map<String, DimensionHandler>> dimensionHandlers;
|
||||
|
||||
public SimpleQueryableIndex(
|
||||
Interval dataInterval,
|
||||
Indexed<String> dimNames,
|
||||
BitmapFactory bitmapFactory,
|
||||
Map<String, ColumnHolder> columns,
|
||||
Map<String, Supplier<ColumnHolder>> columns,
|
||||
SmooshedFileMapper fileMapper,
|
||||
@Nullable Metadata metadata
|
||||
@Nullable Metadata metadata,
|
||||
boolean lazy
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(columns.get(ColumnHolder.TIME_COLUMN_NAME));
|
||||
|
@ -71,8 +74,27 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
|
|||
this.columns = columns;
|
||||
this.fileMapper = fileMapper;
|
||||
this.metadata = metadata;
|
||||
this.dimensionHandlers = Maps.newLinkedHashMap();
|
||||
initDimensionHandlers();
|
||||
|
||||
if (lazy) {
|
||||
this.dimensionHandlers = Suppliers.memoize(() -> {
|
||||
Map<String, DimensionHandler> dimensionHandlerMap = Maps.newLinkedHashMap();
|
||||
for (String dim : availableDimensions) {
|
||||
ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities();
|
||||
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null);
|
||||
dimensionHandlerMap.put(dim, handler);
|
||||
}
|
||||
return dimensionHandlerMap;
|
||||
}
|
||||
);
|
||||
} else {
|
||||
Map<String, DimensionHandler> dimensionHandlerMap = Maps.newLinkedHashMap();
|
||||
for (String dim : availableDimensions) {
|
||||
ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities();
|
||||
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null);
|
||||
dimensionHandlerMap.put(dim, handler);
|
||||
}
|
||||
this.dimensionHandlers = () -> dimensionHandlerMap;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -81,10 +103,10 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
|
|||
List<String> columnNames,
|
||||
Indexed<String> availableDimensions,
|
||||
BitmapFactory bitmapFactory,
|
||||
Map<String, ColumnHolder> columns,
|
||||
Map<String, Supplier<ColumnHolder>> columns,
|
||||
SmooshedFileMapper fileMapper,
|
||||
@Nullable Metadata metadata,
|
||||
Map<String, DimensionHandler> dimensionHandlers
|
||||
Supplier<Map<String, DimensionHandler>> dimensionHandlers
|
||||
)
|
||||
{
|
||||
this.dataInterval = interval;
|
||||
|
@ -106,7 +128,7 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
|
|||
@Override
|
||||
public int getNumRows()
|
||||
{
|
||||
return columns.get(ColumnHolder.TIME_COLUMN_NAME).getLength();
|
||||
return columns.get(ColumnHolder.TIME_COLUMN_NAME).get().getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -137,11 +159,12 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
|
|||
@Override
|
||||
public ColumnHolder getColumnHolder(String columnName)
|
||||
{
|
||||
return columns.get(columnName);
|
||||
Supplier<ColumnHolder> columnHolderSupplier = columns.get(columnName);
|
||||
return columnHolderSupplier == null ? null : columnHolderSupplier.get();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Map<String, ColumnHolder> getColumns()
|
||||
public Map<String, Supplier<ColumnHolder>> getColumns()
|
||||
{
|
||||
return columns;
|
||||
}
|
||||
|
@ -167,15 +190,7 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
|
|||
@Override
|
||||
public Map<String, DimensionHandler> getDimensionHandlers()
|
||||
{
|
||||
return dimensionHandlers;
|
||||
return dimensionHandlers.get();
|
||||
}
|
||||
|
||||
private void initDimensionHandlers()
|
||||
{
|
||||
for (String dim : availableDimensions) {
|
||||
ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities();
|
||||
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null);
|
||||
dimensionHandlers.put(dim, handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,10 +42,10 @@ public class MMappedQueryableSegmentizerFactory implements SegmentizerFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public Segment factorize(DataSegment dataSegment, File parentDir) throws SegmentLoadingException
|
||||
public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException
|
||||
{
|
||||
try {
|
||||
return new QueryableIndexSegment(indexIO.loadIndex(parentDir), dataSegment.getId());
|
||||
return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId());
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException(e, "%s", e.getMessage());
|
||||
|
|
|
@ -31,5 +31,5 @@ import java.io.File;
|
|||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MMappedQueryableSegmentizerFactory.class)
|
||||
public interface SegmentizerFactory
|
||||
{
|
||||
Segment factorize(DataSegment segment, File parentDir) throws SegmentLoadingException;
|
||||
Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException;
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import java.io.File;
|
|||
public interface SegmentLoader
|
||||
{
|
||||
boolean isSegmentLoaded(DataSegment segment);
|
||||
Segment getSegment(DataSegment segment) throws SegmentLoadingException;
|
||||
Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException;
|
||||
File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
|
||||
void cleanup(DataSegment segment);
|
||||
}
|
||||
|
|
|
@ -37,6 +37,9 @@ public class SegmentLoaderConfig
|
|||
@NotEmpty
|
||||
private List<StorageLocationConfig> locations = null;
|
||||
|
||||
@JsonProperty("lazyLoadOnStart")
|
||||
private boolean lazyLoadOnStart = false;
|
||||
|
||||
@JsonProperty("deleteOnRemove")
|
||||
private boolean deleteOnRemove = true;
|
||||
|
||||
|
@ -66,6 +69,11 @@ public class SegmentLoaderConfig
|
|||
return locations;
|
||||
}
|
||||
|
||||
public boolean isLazyLoadOnStart()
|
||||
{
|
||||
return lazyLoadOnStart;
|
||||
}
|
||||
|
||||
public boolean isDeleteOnRemove()
|
||||
{
|
||||
return deleteOnRemove;
|
||||
|
|
|
@ -121,7 +121,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
}
|
||||
|
||||
@Override
|
||||
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
|
||||
public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException
|
||||
{
|
||||
final ReferenceCountingLock lock = createOrGetLock(segment);
|
||||
final File segmentFiles;
|
||||
|
@ -147,7 +147,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
factory = new MMappedQueryableSegmentizerFactory(indexIO);
|
||||
}
|
||||
|
||||
return factory.factorize(segment, segmentFiles);
|
||||
return factory.factorize(segment, segmentFiles, lazy);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -146,13 +146,15 @@ public class SegmentManager
|
|||
*
|
||||
* @param segment segment to load
|
||||
*
|
||||
* @param lazy whether to lazy load columns metadata
|
||||
*
|
||||
* @return true if the segment was newly loaded, false if it was already loaded
|
||||
*
|
||||
* @throws SegmentLoadingException if the segment cannot be loaded
|
||||
*/
|
||||
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
|
||||
public boolean loadSegment(final DataSegment segment, boolean lazy) throws SegmentLoadingException
|
||||
{
|
||||
final Segment adapter = getAdapter(segment);
|
||||
final Segment adapter = getAdapter(segment, lazy);
|
||||
|
||||
final SettableSupplier<Boolean> resultSupplier = new SettableSupplier<>();
|
||||
|
||||
|
@ -189,11 +191,11 @@ public class SegmentManager
|
|||
return resultSupplier.get();
|
||||
}
|
||||
|
||||
private Segment getAdapter(final DataSegment segment) throws SegmentLoadingException
|
||||
private Segment getAdapter(final DataSegment segment, boolean lazy) throws SegmentLoadingException
|
||||
{
|
||||
final Segment adapter;
|
||||
try {
|
||||
adapter = segmentLoader.getSegment(segment);
|
||||
adapter = segmentLoader.getSegment(segment, lazy);
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
segmentLoader.cleanup(segment);
|
||||
|
|
|
@ -252,11 +252,11 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
|||
*
|
||||
* @throws SegmentLoadingException if it fails to load the given segment
|
||||
*/
|
||||
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
|
||||
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy) throws SegmentLoadingException
|
||||
{
|
||||
final boolean loaded;
|
||||
try {
|
||||
loaded = segmentManager.loadSegment(segment);
|
||||
loaded = segmentManager.loadSegment(segment, lazy);
|
||||
}
|
||||
catch (Exception e) {
|
||||
removeSegment(segment, callback, false);
|
||||
|
@ -304,7 +304,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
|||
segmentsToDelete.remove(segment);
|
||||
}
|
||||
}
|
||||
loadSegment(segment, DataSegmentChangeCallback.NOOP);
|
||||
loadSegment(segment, DataSegmentChangeCallback.NOOP, false);
|
||||
// announce segment even if the segment file already exists.
|
||||
try {
|
||||
announcer.announceSegment(segment);
|
||||
|
@ -351,7 +351,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
|||
numSegments,
|
||||
segment.getId()
|
||||
);
|
||||
loadSegment(segment, callback);
|
||||
loadSegment(segment, callback, config.isLazyLoadOnStart());
|
||||
try {
|
||||
backgroundSegmentAnnouncer.announceSegment(segment);
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ public class CacheTestSegmentLoader implements SegmentLoader
|
|||
}
|
||||
|
||||
@Override
|
||||
public Segment getSegment(final DataSegment segment)
|
||||
public Segment getSegment(final DataSegment segment, boolean lazy)
|
||||
{
|
||||
return new AbstractSegment()
|
||||
{
|
||||
|
|
|
@ -66,7 +66,7 @@ public class SegmentManagerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public Segment getSegment(final DataSegment segment)
|
||||
public Segment getSegment(final DataSegment segment, boolean lazy)
|
||||
{
|
||||
return new SegmentForTesting(
|
||||
MapUtils.getString(segment.getLoadSpec(), "version"),
|
||||
|
@ -219,7 +219,7 @@ public class SegmentManagerTest
|
|||
final List<Future<Boolean>> futures = SEGMENTS.stream()
|
||||
.map(
|
||||
segment -> executor.submit(
|
||||
() -> segmentManager.loadSegment(segment)
|
||||
() -> segmentManager.loadSegment(segment, false)
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
@ -235,7 +235,7 @@ public class SegmentManagerTest
|
|||
public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException
|
||||
{
|
||||
for (DataSegment eachSegment : SEGMENTS) {
|
||||
Assert.assertTrue(segmentManager.loadSegment(eachSegment));
|
||||
Assert.assertTrue(segmentManager.loadSegment(eachSegment, false));
|
||||
}
|
||||
|
||||
final List<Future<Void>> futures = ImmutableList.of(SEGMENTS.get(0), SEGMENTS.get(2)).stream()
|
||||
|
@ -261,14 +261,14 @@ public class SegmentManagerTest
|
|||
@Test
|
||||
public void testLoadDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException
|
||||
{
|
||||
Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0)));
|
||||
Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2)));
|
||||
Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false));
|
||||
Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2), false));
|
||||
|
||||
final List<Future<Boolean>> loadFutures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(3), SEGMENTS.get(4))
|
||||
.stream()
|
||||
.map(
|
||||
segment -> executor.submit(
|
||||
() -> segmentManager.loadSegment(segment)
|
||||
() -> segmentManager.loadSegment(segment, false)
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
@ -299,10 +299,10 @@ public class SegmentManagerTest
|
|||
public void testLoadDuplicatedSegmentsSequentially() throws SegmentLoadingException
|
||||
{
|
||||
for (DataSegment segment : SEGMENTS) {
|
||||
Assert.assertTrue(segmentManager.loadSegment(segment));
|
||||
Assert.assertTrue(segmentManager.loadSegment(segment, false));
|
||||
}
|
||||
// try to load an existing segment
|
||||
Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0)));
|
||||
Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0), false));
|
||||
|
||||
assertResult(SEGMENTS);
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ public class SegmentManagerTest
|
|||
.stream()
|
||||
.map(
|
||||
segment -> executor.submit(
|
||||
() -> segmentManager.loadSegment(segment)
|
||||
() -> segmentManager.loadSegment(segment, false)
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
@ -336,7 +336,7 @@ public class SegmentManagerTest
|
|||
@Test
|
||||
public void testNonExistingSegmentsSequentially() throws SegmentLoadingException
|
||||
{
|
||||
Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0)));
|
||||
Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false));
|
||||
|
||||
// try to drop a non-existing segment of different data source
|
||||
segmentManager.dropSegment(SEGMENTS.get(2));
|
||||
|
@ -349,7 +349,7 @@ public class SegmentManagerTest
|
|||
public void testNonExistingSegmentsInParallel()
|
||||
throws SegmentLoadingException, ExecutionException, InterruptedException
|
||||
{
|
||||
segmentManager.loadSegment(SEGMENTS.get(0));
|
||||
segmentManager.loadSegment(SEGMENTS.get(0), false);
|
||||
final List<Future<Void>> futures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(2))
|
||||
.stream()
|
||||
.map(
|
||||
|
@ -372,7 +372,7 @@ public class SegmentManagerTest
|
|||
@Test
|
||||
public void testRemoveEmptyTimeline() throws SegmentLoadingException
|
||||
{
|
||||
segmentManager.loadSegment(SEGMENTS.get(0));
|
||||
segmentManager.loadSegment(SEGMENTS.get(0), false);
|
||||
assertResult(ImmutableList.of(SEGMENTS.get(0)));
|
||||
Assert.assertEquals(1, segmentManager.getDataSources().size());
|
||||
segmentManager.dropSegment(SEGMENTS.get(0));
|
||||
|
@ -406,7 +406,7 @@ public class SegmentManagerTest
|
|||
10
|
||||
);
|
||||
|
||||
segmentManager.loadSegment(segment);
|
||||
segmentManager.loadSegment(segment, false);
|
||||
assertResult(ImmutableList.of(segment));
|
||||
|
||||
segmentManager.dropSegment(segment);
|
||||
|
@ -434,7 +434,7 @@ public class SegmentManagerTest
|
|||
segment.getInterval(),
|
||||
segment.getVersion(),
|
||||
segment.getShardSpec().createChunk(
|
||||
ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment), segment.getShardSpec())
|
||||
ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment, false), segment.getShardSpec())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -129,7 +129,7 @@ public class SegmentManagerThreadSafetyTest
|
|||
final DataSegment segment = createSegment("2019-01-01/2019-01-02");
|
||||
final List<Future> futures = IntStream
|
||||
.range(0, 16)
|
||||
.mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment)))
|
||||
.mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment, false)))
|
||||
.collect(Collectors.toList());
|
||||
for (Future future : futures) {
|
||||
future.get();
|
||||
|
@ -154,7 +154,7 @@ public class SegmentManagerThreadSafetyTest
|
|||
.mapToObj(i -> exec.submit(() -> {
|
||||
for (DataSegment segment : segments) {
|
||||
try {
|
||||
segmentManager.loadSegment(segment);
|
||||
segmentManager.loadSegment(segment, false);
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
@ -222,7 +222,7 @@ public class SegmentManagerThreadSafetyTest
|
|||
private static class TestSegmentizerFactory implements SegmentizerFactory
|
||||
{
|
||||
@Override
|
||||
public Segment factorize(DataSegment segment, File parentDir)
|
||||
public Segment factorize(DataSegment segment, File parentDir, boolean lazy)
|
||||
{
|
||||
return new Segment()
|
||||
{
|
||||
|
|
|
@ -121,7 +121,7 @@ public class ServerManagerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public Segment getSegment(final DataSegment segment)
|
||||
public Segment getSegment(final DataSegment segment, boolean lazy)
|
||||
{
|
||||
return new SegmentForTesting(
|
||||
MapUtils.getString(segment.getLoadSpec(), "version"),
|
||||
|
@ -478,7 +478,8 @@ public class ServerManagerTest
|
|||
NoneShardSpec.instance(),
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
123L
|
||||
)
|
||||
),
|
||||
false
|
||||
);
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
|
|
Loading…
Reference in New Issue