Avoid memory mapping hydrants after they are persisted & after they are merged for native batch ingestion (#11123)

* Avoid mapping hydrants in create segments phase for native ingestion

* Drop queriable indices after a given sink is fully merged

* Do not drop memory mappings for realtime ingestion

* Style fixes

* Renamed to match use case better

* Rollback memoization code and use the real time flag instead

* Null ptr fix in FireHydrant toString plus adjustments to memory pressure tracking calculations

* Style

* Log some count stats

* Make sure sinks size is obtained at the right time

* BatchAppenderator unit test

* Fix comment typos

* Renamed methods to make them more readable

* Move persisted metadata from FireHydrant class to AppenderatorImpl. Removed superfluous differences and fix comment typo. Removed custom comparator

* Missing dependency

* Make persisted hydrant metadata map concurrent and better reflect the fact that keys are Java references. Maintain persisted metadata when dropping/closing segments.

* Replaced concurrent variables with normal ones

* Added   batchMemoryMappedIndex "fallback" flag with default "false". Set this to "true" make code fallback to previous code path.

* Style fix.

* Added note to new setting in doc, using Iterables.size (and removing a dependency), and fixing a typo in a comment.

* Forgot to commit this edited documentation message
This commit is contained in:
Agustin Gonzalez 2021-05-11 14:34:26 -07:00 committed by GitHub
parent 4326e699bd
commit 8e5048e643
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 737 additions and 35 deletions

View File

@ -1332,6 +1332,7 @@ Additional peon configs include:
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|

View File

@ -2782,6 +2782,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null,
null,
null,
false,
false
);
final TestDerbyConnector derbyConnector = derby.getConnector();

View File

@ -2868,6 +2868,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
null,
null,
null,
false,
false
);
final TestDerbyConnector derbyConnector = derby.getConnector();

View File

@ -76,6 +76,9 @@ public class TaskConfig
@JsonProperty
private final boolean ignoreTimestampSpecForDruidInputSource;
@JsonProperty
private final boolean batchMemoryMappedIndex;
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@ -87,7 +90,8 @@ public class TaskConfig
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
@JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@ -113,6 +117,7 @@ public class TaskConfig
this.shuffleDataLocations = shuffleDataLocations;
}
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
this.batchMemoryMappedIndex = batchMemoryMapIndex;
}
@JsonProperty
@ -195,6 +200,13 @@ public class TaskConfig
return ignoreTimestampSpecForDruidInputSource;
}
@JsonProperty
public boolean getBatchMemoryMappedIndex()
{
return batchMemoryMappedIndex;
}
private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {

View File

@ -80,7 +80,8 @@ public final class BatchAppenderators
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
toolbox.getConfig().getBatchMemoryMappedIndex()
);
}

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.appenderator;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorTester;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
public class BatchAppenderatorTest extends InitializedNullHandlingTest
{
private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
createSegmentId("2000/2001", "A", 0),
createSegmentId("2000/2001", "A", 1),
createSegmentId("2001/2002", "A", 0)
);
@Test
public void testSimpleIngestionWithIndexesNotMapped() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
false,
false)) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;
// startJob
Assert.assertEquals(null, appenderator.startJob());
// getDataSource
Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
.getNumRowsInSegment()
);
Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
.getNumRowsInSegment()
);
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(IDENTIFIERS.subList(0, 2),
appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
// getRowCount
Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
thrown = false;
try {
appenderator.getRowCount(IDENTIFIERS.get(2));
}
catch (IllegalStateException e) {
thrown = true;
}
Assert.assertTrue(thrown);
// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
new Function<DataSegment, SegmentIdWithShardSpec>()
{
@Override
public SegmentIdWithShardSpec apply(DataSegment input)
{
return SegmentIdWithShardSpec.fromDataSegment(input);
}
}
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
appenderator.clear();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}
@Test
public void testSimpleIngestionWithIndexesMapped() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
false,
true)) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;
// startJob
Assert.assertEquals(null, appenderator.startJob());
// getDataSource
Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
.getNumRowsInSegment()
);
Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
.getNumRowsInSegment()
);
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(IDENTIFIERS.subList(0, 2),
appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
// getRowCount
Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
thrown = false;
try {
appenderator.getRowCount(IDENTIFIERS.get(2));
}
catch (IllegalStateException e) {
thrown = true;
}
Assert.assertTrue(thrown);
// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
new Function<DataSegment, SegmentIdWithShardSpec>()
{
@Override
public SegmentIdWithShardSpec apply(DataSegment input)
{
return SegmentIdWithShardSpec.fromDataSegment(input);
}
}
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
appenderator.clear();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}
private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
{
return new SegmentIdWithShardSpec(
AppenderatorTester.DATASOURCE,
Intervals.of(interval),
version,
new LinearShardSpec(partitionNum)
);
}
static InputRow createInputRow(String ts, String dim, Object met)
{
return new MapBasedInputRow(
DateTimes.of(ts).getMillis(),
ImmutableList.of("dim"),
ImmutableMap.of(
"dim",
dim,
"met",
met
)
);
}
}

View File

@ -0,0 +1,293 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
public class BatchAppenderatorTester implements AutoCloseable
{
public static final String DATASOURCE = "foo";
private final DataSchema schema;
private final IndexTask.IndexTuningConfig tuningConfig;
private final FireDepartmentMetrics metrics;
private final DataSegmentPusher dataSegmentPusher;
private final ObjectMapper objectMapper;
private final Appenderator appenderator;
private final IndexIO indexIO;
private final IndexMerger indexMerger;
private final ServiceEmitter emitter;
private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
public BatchAppenderatorTester(
final int maxRowsInMemory,
final boolean enablePushFailure,
boolean batchMemoryMappedIndex
)
{
this(maxRowsInMemory, -1, null, enablePushFailure, batchMemoryMappedIndex);
}
public BatchAppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
final File basePersistDirectory,
final boolean enablePushFailure,
boolean batchMemoryMappedIndex
)
{
this(
maxRowsInMemory,
maxSizeInBytes,
basePersistDirectory,
enablePushFailure,
new SimpleRowIngestionMeters(),
false,
batchMemoryMappedIndex
);
}
public BatchAppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
final File basePersistDirectory,
final boolean enablePushFailure,
final RowIngestionMeters rowIngestionMeters,
final boolean skipBytesInMemoryOverheadCheck,
boolean batchMemoryMappedIndex
)
{
objectMapper = new DefaultObjectMapper();
objectMapper.registerSubtypes(LinearShardSpec.class);
final Map<String, Object> parserMap = objectMapper.convertValue(
new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(null, null, null),
null,
null,
null
)
),
Map.class
);
schema = new DataSchema(
DATASOURCE,
parserMap,
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("met", "met")
},
new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
null,
objectMapper
);
tuningConfig = new IndexTask.IndexTuningConfig(
null,
2,
null,
maxRowsInMemory,
maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
true,
null,
null,
null,
null
).withBasePersistDirectory(createNewBasePersistDirectory());
metrics = new FireDepartmentMetrics();
indexIO = new IndexIO(
objectMapper,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
emitter = new ServiceEmitter(
"test",
"test",
new NoopEmitter()
);
emitter.start();
EmittingLogger.registerEmitter(emitter);
dataSegmentPusher = new DataSegmentPusher()
{
private boolean mustFail = true;
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{
throw new UnsupportedOperationException();
}
@Override
public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
{
if (enablePushFailure && mustFail) {
mustFail = false;
throw new IOException("Push failure test");
} else if (enablePushFailure) {
mustFail = true;
}
pushedSegments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
};
appenderator = Appenderators.createOffline(
schema.getDataSource(),
schema,
tuningConfig,
metrics,
dataSegmentPusher,
objectMapper,
indexIO,
indexMerger,
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
batchMemoryMappedIndex
);
}
private long getDefaultMaxBytesInMemory()
{
return (Runtime.getRuntime().totalMemory()) / 3;
}
public DataSchema getSchema()
{
return schema;
}
public IndexTask.IndexTuningConfig getTuningConfig()
{
return tuningConfig;
}
public FireDepartmentMetrics getMetrics()
{
return metrics;
}
public DataSegmentPusher getDataSegmentPusher()
{
return dataSegmentPusher;
}
public ObjectMapper getObjectMapper()
{
return objectMapper;
}
public Appenderator getAppenderator()
{
return appenderator;
}
public List<DataSegment> getPushedSegments()
{
return pushedSegments;
}
@Override
public void close() throws Exception
{
appenderator.close();
emitter.close();
FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
}
private static File createNewBasePersistDirectory()
{
return FileUtils.createTempDir("druid-batch-persist");
}
}

View File

@ -112,6 +112,7 @@ public class TaskToolboxTest
null,
null,
null,
false,
false
),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),

View File

@ -1516,6 +1516,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
null,
null,
null,
false,
false
);

View File

@ -1298,7 +1298,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
);
return new TaskToolbox(
new TaskConfig(null, null, null, null, null, false, null, null, null, false),
new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
null,
createActionClient(task),
null,

View File

@ -1747,7 +1747,7 @@ public class CompactionTaskTest
)
{
super(
new TaskConfig(null, null, null, null, null, false, null, null, null, false),
new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
null,
taskActionClient,
null,

View File

@ -117,6 +117,7 @@ public class HadoopTaskTest
null,
null,
null,
false,
false
)).once();
EasyMock.replay(toolbox);

View File

@ -314,7 +314,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
);
final TaskToolbox box = new TaskToolbox(
new TaskConfig(null, null, null, null, null, false, null, null, null, false),
new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
taskActionClient,
null,

View File

@ -898,6 +898,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
null,
null,
null,
false,
false
);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc);

View File

@ -105,7 +105,8 @@ public class TestAppenderatorsManager implements AppenderatorsManager
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex
)
{
return Appenderators.createOffline(
@ -118,7 +119,8 @@ public class TestAppenderatorsManager implements AppenderatorsManager
indexIO,
indexMerger,
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
batchMemoryMappedIndex
);
}

View File

@ -241,6 +241,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
false,
false
),
null
@ -597,7 +598,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO)
{
final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false);
final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false, false);
objectMapper.setInjectableValues(
new InjectableValues.Std()
@ -632,7 +633,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException
{
return new TaskToolbox(
new TaskConfig(null, null, null, null, null, false, null, null, null, false),
new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
actionClient,
null,

View File

@ -90,6 +90,7 @@ public class SingleTaskBackgroundRunnerTest
null,
null,
null,
false,
false
);
final ServiceEmitter emitter = new NoopServiceEmitter();

View File

@ -600,7 +600,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new TaskAuditLogConfig(true)
);
File tmpDir = temporaryFolder.newFolder();
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false);
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false, false);
return new TaskToolboxFactory(
taskConfig,

View File

@ -89,6 +89,7 @@ public class WorkerTaskManagerTest
null,
null,
null,
false,
false
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);

View File

@ -163,6 +163,7 @@ public class WorkerTaskMonitorTest
null,
null,
null,
false,
false
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);

View File

@ -92,6 +92,7 @@ public class IntermediaryDataManagerAutoCleanupTest
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
false,
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()

View File

@ -75,6 +75,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
null,
null,
ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)),
false,
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();

View File

@ -71,6 +71,7 @@ public class ShuffleDataSegmentPusherTest
null,
null,
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
false,
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();

View File

@ -97,6 +97,7 @@ public class ShuffleResourceTest
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
false,
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()

View File

@ -213,7 +213,7 @@ public class FireHydrant
// Do not include IncrementalIndex in toString as AbstractIndex.toString() actually prints
// all the rows in the index
return "FireHydrant{" +
"queryable=" + adapter.get().getId() +
"queryable=" + (adapter.get() == null ? "null" : adapter.get().getId()) +
", count=" + count +
'}';
}

View File

@ -214,6 +214,15 @@ public interface Appenderator extends QuerySegmentWalker
*/
void closeNow();
/**
* Flag to tell internals whether appenderator is working on behalf of a real time task.
* This is to manage certain aspects as needed. For example, for batch, non-real time tasks,
* physical segments (i.e. hydrants) do not need to memory map their persisted
* files. In this case, the code will avoid memory mapping them thus ameliorating the occurance
* of OOMs.
*/
boolean isRealTime();
/**
* Result of {@link Appenderator#add} containing following information
* - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added
@ -242,7 +251,8 @@ public interface Appenderator extends QuerySegmentWalker
return segmentIdentifier;
}
int getNumRowsInSegment()
@VisibleForTesting
public int getNumRowsInSegment()
{
return numRowsInSegment;
}

View File

@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
@ -59,6 +60,7 @@ import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
@ -70,6 +72,7 @@ import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
@ -83,7 +86,9 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -161,6 +166,19 @@ public class AppenderatorImpl implements Appenderator
private volatile Throwable persistError;
private final boolean isRealTime;
/**
* Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator
* in order to facilitate the mapping of the QueryableIndex associated with a given hydrant
* at merge time. This is necessary since batch appenderator will not map the QueryableIndex
* at persist time in order to minimize its memory footprint. This has to be synchronized since the
* map may be accessed from multiple threads.
* Use {@link IdentityHashMap} to better reflect the fact that the key needs to be interpreted
* with reference semantics.
*/
private final Map<FireHydrant, Pair<File, SegmentId>> persistedHydrantMetadata =
Collections.synchronizedMap(new IdentityHashMap<>());
/**
* This constructor allows the caller to provide its own SinkQuerySegmentWalker.
*
@ -183,7 +201,8 @@ public class AppenderatorImpl implements Appenderator
IndexMerger indexMerger,
Cache cache,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean isRealTime
)
{
this.myId = id;
@ -199,6 +218,7 @@ public class AppenderatorImpl implements Appenderator
this.texasRanger = sinkQuerySegmentWalker;
this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
this.isRealTime = isRealTime;
if (sinkQuerySegmentWalker == null) {
this.sinkTimeline = new VersionedIntervalTimeline<>(
@ -339,7 +359,8 @@ public class AppenderatorImpl implements Appenderator
if (sinkEntry != null) {
bytesToBePersisted += sinkEntry.getBytesInMemory();
if (sinkEntry.swappable()) {
// After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory.
// After swapping the sink, we use memory mapped segment instead (but only for real time appenderators!).
// However, the memory mapped segment still consumes memory.
// These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory
int memoryStillInUse = calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
@ -352,10 +373,14 @@ public class AppenderatorImpl implements Appenderator
// This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
final String alertMessage = StringUtils.format(
"Task has exceeded safe estimated heap usage limits, failing "
+ "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
+ "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])"
+ "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])",
sinks.size(),
sinks.values().stream().mapToInt(Iterables::size).sum(),
getTotalRowCount()
getTotalRowCount(),
bytesCurrentlyInMemory.get(),
bytesToBePersisted,
maxBytesTuningConfig
);
final String errorMessage = StringUtils.format(
"%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to "
@ -530,6 +555,9 @@ public class AppenderatorImpl implements Appenderator
futures.add(abandonSegment(entry.getKey(), entry.getValue(), true));
}
// Re-initialize hydrant map:
persistedHydrantMetadata.clear();
// Await dropping.
Futures.allAsList(futures).get();
}
@ -558,6 +586,9 @@ public class AppenderatorImpl implements Appenderator
final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>();
int numPersistedRows = 0;
long bytesPersisted = 0L;
MutableLong totalHydrantsCount = new MutableLong();
MutableLong totalHydrantsPersisted = new MutableLong();
final long totalSinks = sinks.size();
for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
final SegmentIdWithShardSpec identifier = entry.getKey();
final Sink sink = entry.getValue();
@ -565,21 +596,26 @@ public class AppenderatorImpl implements Appenderator
throw new ISE("No sink for identifier: %s", identifier);
}
final List<FireHydrant> hydrants = Lists.newArrayList(sink);
totalHydrantsCount.add(hydrants.size());
currentHydrants.put(identifier.toString(), hydrants.size());
numPersistedRows += sink.getNumRowsInMemory();
bytesPersisted += sink.getBytesInMemory();
final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
// gather hydrants that have not been persisted:
for (FireHydrant hydrant : hydrants.subList(0, limit)) {
if (!hydrant.hasSwapped()) {
log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier);
indexesToPersist.add(Pair.of(hydrant, identifier));
totalHydrantsPersisted.add(1);
}
}
if (sink.swappable()) {
// It is swappable. Get the old one to persist it and create a new one:
indexesToPersist.add(Pair.of(sink.swap(), identifier));
totalHydrantsPersisted.add(1);
}
}
log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource());
@ -587,6 +623,7 @@ public class AppenderatorImpl implements Appenderator
final Object commitMetadata = committer == null ? null : committer.getMetadata();
final Stopwatch runExecStopwatch = Stopwatch.createStarted();
final Stopwatch persistStopwatch = Stopwatch.createStarted();
AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows);
final ListenableFuture<Object> future = persistExecutor.submit(
new Callable<Object>()
{
@ -640,6 +677,14 @@ public class AppenderatorImpl implements Appenderator
.distinct()
.collect(Collectors.joining(", "))
);
log.info(
"Persisted stats: processed rows: [%d], persisted rows[%d], sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across sinks): [%d]",
rowIngestionMeters.getProcessed(),
totalPersistedRows.get(),
totalSinks,
totalHydrantsCount.longValue(),
totalHydrantsPersisted.longValue()
);
// return null if committer is null
return commitMetadata;
@ -682,6 +727,7 @@ public class AppenderatorImpl implements Appenderator
)
{
final Map<SegmentIdWithShardSpec, Sink> theSinks = new HashMap<>();
AtomicLong pushedHydrantsCount = new AtomicLong();
for (final SegmentIdWithShardSpec identifier : identifiers) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
@ -691,6 +737,8 @@ public class AppenderatorImpl implements Appenderator
if (sink.finishWriting()) {
totalRows.addAndGet(-sink.getNumRows());
}
// count hydrants for stats:
pushedHydrantsCount.addAndGet(Iterables.size(sink));
}
return Futures.transform(
@ -700,6 +748,10 @@ public class AppenderatorImpl implements Appenderator
(Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> {
final List<DataSegment> dataSegments = new ArrayList<>();
log.info("Preparing to push (stats): processed rows: [%d], sinks: [%d], fireHydrants (across sinks): [%d]",
rowIngestionMeters.getProcessed(), theSinks.size(), pushedHydrantsCount.get()
);
log.debug(
"Building and pushing segments: %s",
theSinks.keySet().stream().map(SegmentIdWithShardSpec::toString).collect(Collectors.joining(", "))
@ -723,6 +775,8 @@ public class AppenderatorImpl implements Appenderator
}
}
log.info("Push complete...");
return new SegmentsAndCommitMetadata(dataSegments, commitMetadata);
},
pushExecutor
@ -813,6 +867,34 @@ public class AppenderatorImpl implements Appenderator
Closer closer = Closer.create();
try {
for (FireHydrant fireHydrant : sink) {
// if batch, swap/persist did not memory map the incremental index, we need it mapped now:
if (!isRealTime()) {
// sanity
Pair<File, SegmentId> persistedMetadata = persistedHydrantMetadata.get(fireHydrant);
if (persistedMetadata == null) {
throw new ISE("Persisted metadata for batch hydrant [%s] is null!", fireHydrant);
}
File persistedFile = persistedMetadata.lhs;
SegmentId persistedSegmentId = persistedMetadata.rhs;
// sanity:
if (persistedFile == null) {
throw new ISE("Persisted file for batch hydrant [%s] is null!", fireHydrant);
} else if (persistedSegmentId == null) {
throw new ISE(
"Persisted segmentId for batch hydrant in file [%s] is null!",
persistedFile.getPath()
);
}
fireHydrant.swapSegment(new QueryableIndexSegment(
indexIO.loadIndex(persistedFile),
persistedSegmentId
));
}
Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
@ -860,6 +942,15 @@ public class AppenderatorImpl implements Appenderator
5
);
if (!isRealTime()) {
// Drop the queryable indexes behind the hydrants... they are not needed anymore and their
// mapped file references
// can generate OOMs during merge if enough of them are held back...
for (FireHydrant fireHydrant : sink) {
fireHydrant.swapSegment(null);
}
}
final long pushFinishTime = System.nanoTime();
objectMapper.writeValue(descriptorFile, segment);
@ -986,6 +1077,13 @@ public class AppenderatorImpl implements Appenderator
}
}
@Override
public boolean isRealTime()
{
return isRealTime;
}
private void lockBasePersistDirectory()
{
if (basePersistDirLock == null) {
@ -1303,6 +1401,8 @@ public class AppenderatorImpl implements Appenderator
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
}
hydrant.swapSegment(null);
// remove hydrant from persisted metadata:
persistedHydrantMetadata.remove(hydrant);
}
if (removeOnDiskData) {
@ -1417,9 +1517,15 @@ public class AppenderatorImpl implements Appenderator
numRows
);
indexToPersist.swapSegment(
new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId())
);
// Map only when this appenderator is being driven by a real time task:
Segment segmentToSwap = null;
if (isRealTime()) {
segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId());
} else {
// remember file path & segment id to rebuild the queryable index for merge:
persistedHydrantMetadata.put(indexToPersist, new Pair<>(persistedFile, indexToPersist.getSegmentId()));
}
indexToPersist.swapSegment(segmentToSwap);
return numRows;
}
@ -1457,10 +1563,15 @@ public class AppenderatorImpl implements Appenderator
// These calculations are approximated from actual heap dumps.
// Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment,
// Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.)
return Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT +
(hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
(hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
int total;
total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT;
if (isRealTime()) {
// for real time add references to byte memory mapped references..
total += (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
(hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
}
return total;
}
private int calculateSinkMemoryInUsed(Sink sink)

View File

@ -89,7 +89,8 @@ public class Appenderators
indexMerger,
cache,
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
true
);
}
@ -103,7 +104,8 @@ public class Appenderators
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex
)
{
return new AppenderatorImpl(
@ -119,7 +121,8 @@ public class Appenderators
indexMerger,
null,
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
batchMemoryMappedIndex // This is a task config (default false) to fallback to "old" code in case of bug with the new memory optimization code
);
}
}

View File

@ -98,7 +98,8 @@ public interface AppenderatorsManager
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex
);
/**

View File

@ -73,7 +73,8 @@ public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory
false,
config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE,
0
)
),
false
);
}
}

View File

@ -90,7 +90,8 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex
)
{
throw new UOE(ERROR_MSG);

View File

@ -122,7 +122,8 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex
)
{
// CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators
@ -139,7 +140,8 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
indexIO,
indexMerger,
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
batchMemoryMappedIndex
);
return batchAppenderator;
}

View File

@ -189,7 +189,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
wrapIndexMerger(indexMerger),
cache,
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
true
);
datasourceBundle.addAppenderator(taskId, appenderator);
@ -208,7 +209,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex
)
{
synchronized (this) {
@ -227,7 +229,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
indexIO,
wrapIndexMerger(indexMerger),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
batchMemoryMappedIndex
);
datasourceBundle.addAppenderator(taskId, appenderator);
return appenderator;

View File

@ -210,4 +210,11 @@ public class FireHydrantTest extends InitializedNullHandlingTest
Function.identity()
);
}
@Test
public void testToStringWhenSwappedWithNull()
{
hydrant.swapSegment(null);
hydrant.toString();
}
}

View File

@ -545,5 +545,11 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
{
throw new UnsupportedOperationException();
}
@Override
public boolean isRealTime()
{
return true;
}
}
}

View File

@ -84,7 +84,8 @@ public class UnifiedIndexerAppenderatorsManagerTest
TestHelper.getTestIndexIO(),
TestHelper.getTestIndexMergerV9(OnHeapMemorySegmentWriteOutMediumFactory.instance()),
new NoopRowIngestionMeters(),
new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0)
new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0),
false
);
@Test