diff --git a/docs/configuration/index.md b/docs/configuration/index.md index fdb9365ef1d..35d6ad688d7 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1332,6 +1332,7 @@ Additional peon configs include: |`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote| |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`| |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`| +|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 128810463d9..34ef75cc87a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2782,6 +2782,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, null, + false, false ); final TestDerbyConnector derbyConnector = derby.getConnector(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 6b497598534..19d2445f7ee 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2868,6 +2868,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, null, + false, false ); final TestDerbyConnector derbyConnector = derby.getConnector(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index f0999d55a4c..0285b33cd85 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -76,6 +76,9 @@ public class TaskConfig @JsonProperty private final boolean ignoreTimestampSpecForDruidInputSource; + @JsonProperty + private final boolean batchMemoryMappedIndex; + @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @@ -87,7 +90,8 @@ public class TaskConfig @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, @JsonProperty("directoryLockTimeout") Period directoryLockTimeout, @JsonProperty("shuffleDataLocations") List shuffleDataLocations, - @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource + @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource, + @JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior ) { this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir; @@ -113,6 +117,7 @@ public class TaskConfig this.shuffleDataLocations = shuffleDataLocations; } this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource; + this.batchMemoryMappedIndex = batchMemoryMapIndex; } @JsonProperty @@ -195,6 +200,13 @@ public class TaskConfig return ignoreTimestampSpecForDruidInputSource; } + @JsonProperty + public boolean getBatchMemoryMappedIndex() + { + return batchMemoryMappedIndex; + } + + private String defaultDir(@Nullable String configParameter, final String defaultVal) { if (configParameter == null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java index 711f9d478b9..bff138fb717 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java @@ -80,7 +80,8 @@ public final class BatchAppenderators toolbox.getIndexIO(), toolbox.getIndexMergerV9(), rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + toolbox.getConfig().getBatchMemoryMappedIndex() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java new file mode 100644 index 00000000000..e4417639f01 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.appenderator; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.realtime.appenderator.Appenderator; +import org.apache.druid.segment.realtime.appenderator.AppenderatorTester; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class BatchAppenderatorTest extends InitializedNullHandlingTest +{ + private static final List IDENTIFIERS = ImmutableList.of( + createSegmentId("2000/2001", "A", 0), + createSegmentId("2000/2001", "A", 1), + createSegmentId("2001/2002", "A", 0) + ); + + @Test + public void testSimpleIngestionWithIndexesNotMapped() throws Exception + { + try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, + false, + false)) { + final Appenderator appenderator = tester.getAppenderator(); + boolean thrown; + + // startJob + Assert.assertEquals(null, appenderator.startJob()); + + // getDataSource + Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource()); + + // add + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 2, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null) + .getNumRowsInSegment() + ); + + // getSegments + Assert.assertEquals(IDENTIFIERS.subList(0, 2), + appenderator.getSegments().stream().sorted().collect(Collectors.toList())); + + // getRowCount + Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0))); + Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1))); + thrown = false; + try { + appenderator.getRowCount(IDENTIFIERS.get(2)); + } + catch (IllegalStateException e) { + thrown = true; + } + Assert.assertTrue(thrown); + + // push all + final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push( + appenderator.getSegments(), + null, + false + ).get(); + Assert.assertEquals( + IDENTIFIERS.subList(0, 2), + Lists.transform( + segmentsAndCommitMetadata.getSegments(), + new Function() + { + @Override + public SegmentIdWithShardSpec apply(DataSegment input) + { + return SegmentIdWithShardSpec.fromDataSegment(input); + } + } + ).stream().sorted().collect(Collectors.toList()) + ); + Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()), + segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())); + + appenderator.clear(); + Assert.assertTrue(appenderator.getSegments().isEmpty()); + } + } + + @Test + public void testSimpleIngestionWithIndexesMapped() throws Exception + { + try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, + false, + true)) { + final Appenderator appenderator = tester.getAppenderator(); + boolean thrown; + + // startJob + Assert.assertEquals(null, appenderator.startJob()); + + // getDataSource + Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource()); + + // add + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 2, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null) + .getNumRowsInSegment() + ); + + // getSegments + Assert.assertEquals(IDENTIFIERS.subList(0, 2), + appenderator.getSegments().stream().sorted().collect(Collectors.toList())); + + // getRowCount + Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0))); + Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1))); + thrown = false; + try { + appenderator.getRowCount(IDENTIFIERS.get(2)); + } + catch (IllegalStateException e) { + thrown = true; + } + Assert.assertTrue(thrown); + + // push all + final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push( + appenderator.getSegments(), + null, + false + ).get(); + Assert.assertEquals( + IDENTIFIERS.subList(0, 2), + Lists.transform( + segmentsAndCommitMetadata.getSegments(), + new Function() + { + @Override + public SegmentIdWithShardSpec apply(DataSegment input) + { + return SegmentIdWithShardSpec.fromDataSegment(input); + } + } + ).stream().sorted().collect(Collectors.toList()) + ); + Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()), + segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())); + + appenderator.clear(); + Assert.assertTrue(appenderator.getSegments().isEmpty()); + } + } + private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum) + { + return new SegmentIdWithShardSpec( + AppenderatorTester.DATASOURCE, + Intervals.of(interval), + version, + new LinearShardSpec(partitionNum) + + ); + } + + static InputRow createInputRow(String ts, String dim, Object met) + { + return new MapBasedInputRow( + DateTimes.of(ts).getMillis(), + ImmutableList.of("dim"), + ImmutableMap.of( + "dim", + dim, + "met", + met + ) + ); + } + + +} + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java new file mode 100644 index 00000000000..4058aff938e --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.ParseExceptionHandler; +import org.apache.druid.segment.incremental.RowIngestionMeters; +import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.realtime.FireDepartmentMetrics; +import org.apache.druid.segment.realtime.appenderator.Appenderator; +import org.apache.druid.segment.realtime.appenderator.Appenderators; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +public class BatchAppenderatorTester implements AutoCloseable +{ + public static final String DATASOURCE = "foo"; + + private final DataSchema schema; + private final IndexTask.IndexTuningConfig tuningConfig; + private final FireDepartmentMetrics metrics; + private final DataSegmentPusher dataSegmentPusher; + private final ObjectMapper objectMapper; + private final Appenderator appenderator; + private final IndexIO indexIO; + private final IndexMerger indexMerger; + private final ServiceEmitter emitter; + + private final List pushedSegments = new CopyOnWriteArrayList<>(); + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final boolean enablePushFailure, + boolean batchMemoryMappedIndex + ) + { + this(maxRowsInMemory, -1, null, enablePushFailure, batchMemoryMappedIndex); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + final File basePersistDirectory, + final boolean enablePushFailure, + boolean batchMemoryMappedIndex + ) + { + this( + maxRowsInMemory, + maxSizeInBytes, + basePersistDirectory, + enablePushFailure, + new SimpleRowIngestionMeters(), + false, + batchMemoryMappedIndex + ); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + final File basePersistDirectory, + final boolean enablePushFailure, + final RowIngestionMeters rowIngestionMeters, + final boolean skipBytesInMemoryOverheadCheck, + boolean batchMemoryMappedIndex + ) + { + objectMapper = new DefaultObjectMapper(); + objectMapper.registerSubtypes(LinearShardSpec.class); + + final Map parserMap = objectMapper.convertValue( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(null, null, null), + null, + null, + null + ) + ), + Map.class + ); + schema = new DataSchema( + DATASOURCE, + parserMap, + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("met", "met") + }, + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null), + null, + objectMapper + ); + tuningConfig = new IndexTask.IndexTuningConfig( + null, + 2, + null, + maxRowsInMemory, + maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes, + false, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + true, + null, + null, + null, + null + ).withBasePersistDirectory(createNewBasePersistDirectory()); + + metrics = new FireDepartmentMetrics(); + + indexIO = new IndexIO( + objectMapper, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + + emitter = new ServiceEmitter( + "test", + "test", + new NoopEmitter() + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + dataSegmentPusher = new DataSegmentPusher() + { + private boolean mustFail = true; + + @Deprecated + @Override + public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException + { + if (enablePushFailure && mustFail) { + mustFail = false; + throw new IOException("Push failure test"); + } else if (enablePushFailure) { + mustFail = true; + } + pushedSegments.add(segment); + return segment; + } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } + }; + appenderator = Appenderators.createOffline( + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + batchMemoryMappedIndex + ); + } + + private long getDefaultMaxBytesInMemory() + { + return (Runtime.getRuntime().totalMemory()) / 3; + } + + public DataSchema getSchema() + { + return schema; + } + + public IndexTask.IndexTuningConfig getTuningConfig() + { + return tuningConfig; + } + + public FireDepartmentMetrics getMetrics() + { + return metrics; + } + + public DataSegmentPusher getDataSegmentPusher() + { + return dataSegmentPusher; + } + + public ObjectMapper getObjectMapper() + { + return objectMapper; + } + + public Appenderator getAppenderator() + { + return appenderator; + } + + public List getPushedSegments() + { + return pushedSegments; + } + + @Override + public void close() throws Exception + { + appenderator.close(); + emitter.close(); + FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory()); + } + + private static File createNewBasePersistDirectory() + { + return FileUtils.createTempDir("druid-batch-persist"); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index b5a8d736301..671556975a9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -112,6 +112,7 @@ public class TaskToolboxTest null, null, null, + false, false ), new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 46fa4fac6a1..113d41a864a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1516,6 +1516,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand null, null, null, + false, false ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 03acabd620b..a958ee66203 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -1298,7 +1298,7 @@ public class CompactionTaskRunTest extends IngestionTestBase ); return new TaskToolbox( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), null, createActionClient(task), null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c4faa5b2754..330ccbf6494 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -1747,7 +1747,7 @@ public class CompactionTaskTest ) { super( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), null, taskActionClient, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java index caaeea253c7..97d6df15b32 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java @@ -117,6 +117,7 @@ public class HadoopTaskTest null, null, null, + false, false )).once(); EasyMock.replay(toolbox); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 4123688dc47..c42bf20dad2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -314,7 +314,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest ); final TaskToolbox box = new TaskToolbox( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), taskActionClient, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 571566223a0..0a2e34d7d6d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -898,6 +898,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest null, null, null, + false, false ); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index 2a342b7ccdd..6e963e6f913 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -105,7 +105,8 @@ public class TestAppenderatorsManager implements AppenderatorsManager IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { return Appenderators.createOffline( @@ -118,7 +119,8 @@ public class TestAppenderatorsManager implements AppenderatorsManager indexIO, indexMerger, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 23b5b9d53ef..a0d54496870 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -241,6 +241,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase null, null, ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)), + false, false ), null @@ -597,7 +598,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO) { - final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false); + final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false, false); objectMapper.setInjectableValues( new InjectableValues.Std() @@ -632,7 +633,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException { return new TaskToolbox( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), actionClient, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 120c7090d93..c05cdb8a90b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -90,6 +90,7 @@ public class SingleTaskBackgroundRunnerTest null, null, null, + false, false ); final ServiceEmitter emitter = new NoopServiceEmitter(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6e824721a9e..7c41ba984a7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -600,7 +600,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest new TaskAuditLogConfig(true) ); File tmpDir = temporaryFolder.newFolder(); - taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false); + taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false, false); return new TaskToolboxFactory( taskConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 1bb33f0b457..2dd94e8c287 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -89,6 +89,7 @@ public class WorkerTaskManagerTest null, null, null, + false, false ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index 2a6e7931632..c1845097996 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -163,6 +163,7 @@ public class WorkerTaskMonitorTest null, null, null, + false, false ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java index a10776bdf61..9736fc72dba 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java @@ -92,6 +92,7 @@ public class IntermediaryDataManagerAutoCleanupTest null, null, ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java index 162993218fe..6e52b30b9eb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -75,6 +75,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest null, null, ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java index ea6fb5fbfe0..f110cc20b1e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java @@ -71,6 +71,7 @@ public class ShuffleDataSegmentPusherTest null, null, ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java index e4327d3baed..54a6b020d86 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java @@ -97,6 +97,7 @@ public class ShuffleResourceTest null, null, ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java index 9c59387d102..29a8986a04c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java @@ -213,7 +213,7 @@ public class FireHydrant // Do not include IncrementalIndex in toString as AbstractIndex.toString() actually prints // all the rows in the index return "FireHydrant{" + - "queryable=" + adapter.get().getId() + + "queryable=" + (adapter.get() == null ? "null" : adapter.get().getId()) + ", count=" + count + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java index 14796df44a2..1d7da70837a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java @@ -214,6 +214,15 @@ public interface Appenderator extends QuerySegmentWalker */ void closeNow(); + /** + * Flag to tell internals whether appenderator is working on behalf of a real time task. + * This is to manage certain aspects as needed. For example, for batch, non-real time tasks, + * physical segments (i.e. hydrants) do not need to memory map their persisted + * files. In this case, the code will avoid memory mapping them thus ameliorating the occurance + * of OOMs. + */ + boolean isRealTime(); + /** * Result of {@link Appenderator#add} containing following information * - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added @@ -242,7 +251,8 @@ public interface Appenderator extends QuerySegmentWalker return segmentIdentifier; } - int getNumRowsInSegment() + @VisibleForTesting + public int getNumRowsInSegment() { return numRowsInSegment; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 9121e0aae45..e6cd9c52aa9 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.druid.client.cache.Cache; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; @@ -59,6 +60,7 @@ import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.Segment; import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -70,6 +72,7 @@ import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; @@ -83,7 +86,9 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -161,6 +166,19 @@ public class AppenderatorImpl implements Appenderator private volatile Throwable persistError; + private final boolean isRealTime; + /** + * Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator + * in order to facilitate the mapping of the QueryableIndex associated with a given hydrant + * at merge time. This is necessary since batch appenderator will not map the QueryableIndex + * at persist time in order to minimize its memory footprint. This has to be synchronized since the + * map may be accessed from multiple threads. + * Use {@link IdentityHashMap} to better reflect the fact that the key needs to be interpreted + * with reference semantics. + */ + private final Map> persistedHydrantMetadata = + Collections.synchronizedMap(new IdentityHashMap<>()); + /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. * @@ -183,7 +201,8 @@ public class AppenderatorImpl implements Appenderator IndexMerger indexMerger, Cache cache, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean isRealTime ) { this.myId = id; @@ -199,6 +218,7 @@ public class AppenderatorImpl implements Appenderator this.texasRanger = sinkQuerySegmentWalker; this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters"); this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler"); + this.isRealTime = isRealTime; if (sinkQuerySegmentWalker == null) { this.sinkTimeline = new VersionedIntervalTimeline<>( @@ -339,7 +359,8 @@ public class AppenderatorImpl implements Appenderator if (sinkEntry != null) { bytesToBePersisted += sinkEntry.getBytesInMemory(); if (sinkEntry.swappable()) { - // After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory. + // After swapping the sink, we use memory mapped segment instead (but only for real time appenderators!). + // However, the memory mapped segment still consumes memory. // These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory int memoryStillInUse = calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant()); bytesCurrentlyInMemory.addAndGet(memoryStillInUse); @@ -352,10 +373,14 @@ public class AppenderatorImpl implements Appenderator // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion) final String alertMessage = StringUtils.format( "Task has exceeded safe estimated heap usage limits, failing " - + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])", + + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])" + + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])", sinks.size(), sinks.values().stream().mapToInt(Iterables::size).sum(), - getTotalRowCount() + getTotalRowCount(), + bytesCurrentlyInMemory.get(), + bytesToBePersisted, + maxBytesTuningConfig ); final String errorMessage = StringUtils.format( "%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to " @@ -530,6 +555,9 @@ public class AppenderatorImpl implements Appenderator futures.add(abandonSegment(entry.getKey(), entry.getValue(), true)); } + // Re-initialize hydrant map: + persistedHydrantMetadata.clear(); + // Await dropping. Futures.allAsList(futures).get(); } @@ -558,6 +586,9 @@ public class AppenderatorImpl implements Appenderator final List> indexesToPersist = new ArrayList<>(); int numPersistedRows = 0; long bytesPersisted = 0L; + MutableLong totalHydrantsCount = new MutableLong(); + MutableLong totalHydrantsPersisted = new MutableLong(); + final long totalSinks = sinks.size(); for (Map.Entry entry : sinks.entrySet()) { final SegmentIdWithShardSpec identifier = entry.getKey(); final Sink sink = entry.getValue(); @@ -565,21 +596,26 @@ public class AppenderatorImpl implements Appenderator throw new ISE("No sink for identifier: %s", identifier); } final List hydrants = Lists.newArrayList(sink); + totalHydrantsCount.add(hydrants.size()); currentHydrants.put(identifier.toString(), hydrants.size()); numPersistedRows += sink.getNumRowsInMemory(); bytesPersisted += sink.getBytesInMemory(); final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); + // gather hydrants that have not been persisted: for (FireHydrant hydrant : hydrants.subList(0, limit)) { if (!hydrant.hasSwapped()) { log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier); indexesToPersist.add(Pair.of(hydrant, identifier)); + totalHydrantsPersisted.add(1); } } if (sink.swappable()) { + // It is swappable. Get the old one to persist it and create a new one: indexesToPersist.add(Pair.of(sink.swap(), identifier)); + totalHydrantsPersisted.add(1); } } log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); @@ -587,6 +623,7 @@ public class AppenderatorImpl implements Appenderator final Object commitMetadata = committer == null ? null : committer.getMetadata(); final Stopwatch runExecStopwatch = Stopwatch.createStarted(); final Stopwatch persistStopwatch = Stopwatch.createStarted(); + AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows); final ListenableFuture future = persistExecutor.submit( new Callable() { @@ -640,6 +677,14 @@ public class AppenderatorImpl implements Appenderator .distinct() .collect(Collectors.joining(", ")) ); + log.info( + "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across sinks): [%d]", + rowIngestionMeters.getProcessed(), + totalPersistedRows.get(), + totalSinks, + totalHydrantsCount.longValue(), + totalHydrantsPersisted.longValue() + ); // return null if committer is null return commitMetadata; @@ -682,6 +727,7 @@ public class AppenderatorImpl implements Appenderator ) { final Map theSinks = new HashMap<>(); + AtomicLong pushedHydrantsCount = new AtomicLong(); for (final SegmentIdWithShardSpec identifier : identifiers) { final Sink sink = sinks.get(identifier); if (sink == null) { @@ -691,6 +737,8 @@ public class AppenderatorImpl implements Appenderator if (sink.finishWriting()) { totalRows.addAndGet(-sink.getNumRows()); } + // count hydrants for stats: + pushedHydrantsCount.addAndGet(Iterables.size(sink)); } return Futures.transform( @@ -700,6 +748,10 @@ public class AppenderatorImpl implements Appenderator (Function) commitMetadata -> { final List dataSegments = new ArrayList<>(); + log.info("Preparing to push (stats): processed rows: [%d], sinks: [%d], fireHydrants (across sinks): [%d]", + rowIngestionMeters.getProcessed(), theSinks.size(), pushedHydrantsCount.get() + ); + log.debug( "Building and pushing segments: %s", theSinks.keySet().stream().map(SegmentIdWithShardSpec::toString).collect(Collectors.joining(", ")) @@ -723,6 +775,8 @@ public class AppenderatorImpl implements Appenderator } } + log.info("Push complete..."); + return new SegmentsAndCommitMetadata(dataSegments, commitMetadata); }, pushExecutor @@ -813,6 +867,34 @@ public class AppenderatorImpl implements Appenderator Closer closer = Closer.create(); try { for (FireHydrant fireHydrant : sink) { + + // if batch, swap/persist did not memory map the incremental index, we need it mapped now: + if (!isRealTime()) { + + // sanity + Pair persistedMetadata = persistedHydrantMetadata.get(fireHydrant); + if (persistedMetadata == null) { + throw new ISE("Persisted metadata for batch hydrant [%s] is null!", fireHydrant); + } + + File persistedFile = persistedMetadata.lhs; + SegmentId persistedSegmentId = persistedMetadata.rhs; + + // sanity: + if (persistedFile == null) { + throw new ISE("Persisted file for batch hydrant [%s] is null!", fireHydrant); + } else if (persistedSegmentId == null) { + throw new ISE( + "Persisted segmentId for batch hydrant in file [%s] is null!", + persistedFile.getPath() + ); + } + fireHydrant.swapSegment(new QueryableIndexSegment( + indexIO.loadIndex(persistedFile), + persistedSegmentId + )); + } + Pair segmentAndCloseable = fireHydrant.getAndIncrementSegment(); final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant); @@ -860,6 +942,15 @@ public class AppenderatorImpl implements Appenderator 5 ); + if (!isRealTime()) { + // Drop the queryable indexes behind the hydrants... they are not needed anymore and their + // mapped file references + // can generate OOMs during merge if enough of them are held back... + for (FireHydrant fireHydrant : sink) { + fireHydrant.swapSegment(null); + } + } + final long pushFinishTime = System.nanoTime(); objectMapper.writeValue(descriptorFile, segment); @@ -986,6 +1077,13 @@ public class AppenderatorImpl implements Appenderator } } + @Override + public boolean isRealTime() + { + return isRealTime; + } + + private void lockBasePersistDirectory() { if (basePersistDirLock == null) { @@ -1303,6 +1401,8 @@ public class AppenderatorImpl implements Appenderator cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); } hydrant.swapSegment(null); + // remove hydrant from persisted metadata: + persistedHydrantMetadata.remove(hydrant); } if (removeOnDiskData) { @@ -1417,9 +1517,15 @@ public class AppenderatorImpl implements Appenderator numRows ); - indexToPersist.swapSegment( - new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()) - ); + // Map only when this appenderator is being driven by a real time task: + Segment segmentToSwap = null; + if (isRealTime()) { + segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()); + } else { + // remember file path & segment id to rebuild the queryable index for merge: + persistedHydrantMetadata.put(indexToPersist, new Pair<>(persistedFile, indexToPersist.getSegmentId())); + } + indexToPersist.swapSegment(segmentToSwap); return numRows; } @@ -1457,10 +1563,15 @@ public class AppenderatorImpl implements Appenderator // These calculations are approximated from actual heap dumps. // Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment, // Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.) - return Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT + - (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) + - (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + - ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER; + int total; + total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT; + if (isRealTime()) { + // for real time add references to byte memory mapped references.. + total += (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) + + (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + + ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER; + } + return total; } private int calculateSinkMemoryInUsed(Sink sink) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index c59e4e053e5..ff8799daeba 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -89,7 +89,8 @@ public class Appenderators indexMerger, cache, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + true ); } @@ -103,7 +104,8 @@ public class Appenderators IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { return new AppenderatorImpl( @@ -119,7 +121,8 @@ public class Appenderators indexMerger, null, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex // This is a task config (default false) to fallback to "old" code in case of bug with the new memory optimization code ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java index 76c64d26a41..c5b22cfb1b5 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java @@ -98,7 +98,8 @@ public interface AppenderatorsManager IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ); /** diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java index 7a0f1dc6fcd..8abd0c29767 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java @@ -73,7 +73,8 @@ public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory false, config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE, 0 - ) + ), + false ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java index 87de2449a47..f1e2f3c9134 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java @@ -90,7 +90,8 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { throw new UOE(ERROR_MSG); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index 7fa4f4c7c9f..88a4f5720c1 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -122,7 +122,8 @@ public class PeonAppenderatorsManager implements AppenderatorsManager IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { // CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators @@ -139,7 +140,8 @@ public class PeonAppenderatorsManager implements AppenderatorsManager indexIO, indexMerger, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex ); return batchAppenderator; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 369709882a7..3780c373575 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -189,7 +189,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager wrapIndexMerger(indexMerger), cache, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + true ); datasourceBundle.addAppenderator(taskId, appenderator); @@ -208,7 +209,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { synchronized (this) { @@ -227,7 +229,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager indexIO, wrapIndexMerger(indexMerger), rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex ); datasourceBundle.addAppenderator(taskId, appenderator); return appenderator; diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java index 4f288426e50..464141a42a1 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java @@ -210,4 +210,11 @@ public class FireHydrantTest extends InitializedNullHandlingTest Function.identity() ); } + + @Test + public void testToStringWhenSwappedWithNull() + { + hydrant.swapSegment(null); + hydrant.toString(); + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 4f9cd3c3415..408aa97e400 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -545,5 +545,11 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport { throw new UnsupportedOperationException(); } + @Override + public boolean isRealTime() + { + return true; + } + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java index f7c85b2ccd0..728e98026d8 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java @@ -84,7 +84,8 @@ public class UnifiedIndexerAppenderatorsManagerTest TestHelper.getTestIndexIO(), TestHelper.getTestIndexMergerV9(OnHeapMemorySegmentWriteOutMediumFactory.instance()), new NoopRowIngestionMeters(), - new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0) + new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0), + false ); @Test