diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index a5e9d9aa239..f01079254dc 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1334,7 +1334,7 @@ Additional peon configs include:
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
-|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
+|`druid.indexer.task.useLegacyBatchProcessing`|If false, native batch ingestion will use a new, recommended, code path with memory optimized code for the segment creation phase. If true it will use the previous code path for the create segments phase of batch ingestion. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new optimized batch ingestion code. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 5ef9fa63e30..a711d98e137 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -62,11 +62,6 @@
druid-hll${project.parent.version}
-
- org.apache.commons
- commons-collections4
- 4.2
- io.dropwizard.metricsmetrics-core
@@ -232,7 +227,11 @@
jackson-core-aslprovided
-
+
+ org.apache.commons
+ commons-collections4
+ provided
+ junit
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 0285b33cd85..27e2c552a3b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -77,7 +77,7 @@ public class TaskConfig
private final boolean ignoreTimestampSpecForDruidInputSource;
@JsonProperty
- private final boolean batchMemoryMappedIndex;
+ private final boolean useLegacyBatchProcessing;
@JsonCreator
public TaskConfig(
@@ -91,7 +91,7 @@ public class TaskConfig
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List shuffleDataLocations,
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
- @JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior
+ @JsonProperty("useLegacyBatchProcessing") boolean useLegacyBatchProcessing // only set to true to fall back to older behavior
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@@ -117,7 +117,7 @@ public class TaskConfig
this.shuffleDataLocations = shuffleDataLocations;
}
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
- this.batchMemoryMappedIndex = batchMemoryMapIndex;
+ this.useLegacyBatchProcessing = useLegacyBatchProcessing;
}
@JsonProperty
@@ -201,9 +201,9 @@ public class TaskConfig
}
@JsonProperty
- public boolean getBatchMemoryMappedIndex()
+ public boolean getuseLegacyBatchProcessing()
{
- return batchMemoryMappedIndex;
+ return useLegacyBatchProcessing;
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
index bff138fb717..1a437063060 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
@@ -81,7 +81,7 @@ public final class BatchAppenderators
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler,
- toolbox.getConfig().getBatchMemoryMappedIndex()
+ toolbox.getConfig().getuseLegacyBatchProcessing()
);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java
deleted file mode 100644
index e4417639f01..00000000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.appenderator;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.segment.realtime.appenderator.Appenderator;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorTester;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
-import org.apache.druid.testing.InitializedNullHandlingTest;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class BatchAppenderatorTest extends InitializedNullHandlingTest
-{
- private static final List IDENTIFIERS = ImmutableList.of(
- createSegmentId("2000/2001", "A", 0),
- createSegmentId("2000/2001", "A", 1),
- createSegmentId("2001/2002", "A", 0)
- );
-
- @Test
- public void testSimpleIngestionWithIndexesNotMapped() throws Exception
- {
- try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
- false,
- false)) {
- final Appenderator appenderator = tester.getAppenderator();
- boolean thrown;
-
- // startJob
- Assert.assertEquals(null, appenderator.startJob());
-
- // getDataSource
- Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
-
- // add
- Assert.assertEquals(
- 1,
- appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
- .getNumRowsInSegment()
- );
-
- Assert.assertEquals(
- 2,
- appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
- .getNumRowsInSegment()
- );
-
- Assert.assertEquals(
- 1,
- appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
- .getNumRowsInSegment()
- );
-
- // getSegments
- Assert.assertEquals(IDENTIFIERS.subList(0, 2),
- appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
-
- // getRowCount
- Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
- Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
- thrown = false;
- try {
- appenderator.getRowCount(IDENTIFIERS.get(2));
- }
- catch (IllegalStateException e) {
- thrown = true;
- }
- Assert.assertTrue(thrown);
-
- // push all
- final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
- appenderator.getSegments(),
- null,
- false
- ).get();
- Assert.assertEquals(
- IDENTIFIERS.subList(0, 2),
- Lists.transform(
- segmentsAndCommitMetadata.getSegments(),
- new Function()
- {
- @Override
- public SegmentIdWithShardSpec apply(DataSegment input)
- {
- return SegmentIdWithShardSpec.fromDataSegment(input);
- }
- }
- ).stream().sorted().collect(Collectors.toList())
- );
- Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
- segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
-
- appenderator.clear();
- Assert.assertTrue(appenderator.getSegments().isEmpty());
- }
- }
-
- @Test
- public void testSimpleIngestionWithIndexesMapped() throws Exception
- {
- try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
- false,
- true)) {
- final Appenderator appenderator = tester.getAppenderator();
- boolean thrown;
-
- // startJob
- Assert.assertEquals(null, appenderator.startJob());
-
- // getDataSource
- Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
-
- // add
- Assert.assertEquals(
- 1,
- appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
- .getNumRowsInSegment()
- );
-
- Assert.assertEquals(
- 2,
- appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
- .getNumRowsInSegment()
- );
-
- Assert.assertEquals(
- 1,
- appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
- .getNumRowsInSegment()
- );
-
- // getSegments
- Assert.assertEquals(IDENTIFIERS.subList(0, 2),
- appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
-
- // getRowCount
- Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
- Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
- thrown = false;
- try {
- appenderator.getRowCount(IDENTIFIERS.get(2));
- }
- catch (IllegalStateException e) {
- thrown = true;
- }
- Assert.assertTrue(thrown);
-
- // push all
- final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
- appenderator.getSegments(),
- null,
- false
- ).get();
- Assert.assertEquals(
- IDENTIFIERS.subList(0, 2),
- Lists.transform(
- segmentsAndCommitMetadata.getSegments(),
- new Function()
- {
- @Override
- public SegmentIdWithShardSpec apply(DataSegment input)
- {
- return SegmentIdWithShardSpec.fromDataSegment(input);
- }
- }
- ).stream().sorted().collect(Collectors.toList())
- );
- Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
- segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
-
- appenderator.clear();
- Assert.assertTrue(appenderator.getSegments().isEmpty());
- }
- }
- private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
- {
- return new SegmentIdWithShardSpec(
- AppenderatorTester.DATASOURCE,
- Intervals.of(interval),
- version,
- new LinearShardSpec(partitionNum)
-
- );
- }
-
- static InputRow createInputRow(String ts, String dim, Object met)
- {
- return new MapBasedInputRow(
- DateTimes.of(ts).getMillis(),
- ImmutableList.of("dim"),
- ImmutableMap.of(
- "dim",
- dim,
- "met",
- met
- )
- );
- }
-
-
-}
-
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java
deleted file mode 100644
index 4058aff938e..00000000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.appenderator;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.JSONParseSpec;
-import org.apache.druid.data.input.impl.MapInputRowParser;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.indexing.common.task.IndexTask;
-import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.IndexMerger;
-import org.apache.druid.segment.IndexMergerV9;
-import org.apache.druid.segment.column.ColumnConfig;
-import org.apache.druid.segment.incremental.ParseExceptionHandler;
-import org.apache.druid.segment.incremental.RowIngestionMeters;
-import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.apache.druid.segment.realtime.appenderator.Appenderator;
-import org.apache.druid.segment.realtime.appenderator.Appenderators;
-import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public class BatchAppenderatorTester implements AutoCloseable
-{
- public static final String DATASOURCE = "foo";
-
- private final DataSchema schema;
- private final IndexTask.IndexTuningConfig tuningConfig;
- private final FireDepartmentMetrics metrics;
- private final DataSegmentPusher dataSegmentPusher;
- private final ObjectMapper objectMapper;
- private final Appenderator appenderator;
- private final IndexIO indexIO;
- private final IndexMerger indexMerger;
- private final ServiceEmitter emitter;
-
- private final List pushedSegments = new CopyOnWriteArrayList<>();
-
- public BatchAppenderatorTester(
- final int maxRowsInMemory,
- final boolean enablePushFailure,
- boolean batchMemoryMappedIndex
- )
- {
- this(maxRowsInMemory, -1, null, enablePushFailure, batchMemoryMappedIndex);
- }
-
- public BatchAppenderatorTester(
- final int maxRowsInMemory,
- final long maxSizeInBytes,
- final File basePersistDirectory,
- final boolean enablePushFailure,
- boolean batchMemoryMappedIndex
- )
- {
- this(
- maxRowsInMemory,
- maxSizeInBytes,
- basePersistDirectory,
- enablePushFailure,
- new SimpleRowIngestionMeters(),
- false,
- batchMemoryMappedIndex
- );
- }
-
- public BatchAppenderatorTester(
- final int maxRowsInMemory,
- final long maxSizeInBytes,
- final File basePersistDirectory,
- final boolean enablePushFailure,
- final RowIngestionMeters rowIngestionMeters,
- final boolean skipBytesInMemoryOverheadCheck,
- boolean batchMemoryMappedIndex
- )
- {
- objectMapper = new DefaultObjectMapper();
- objectMapper.registerSubtypes(LinearShardSpec.class);
-
- final Map parserMap = objectMapper.convertValue(
- new MapInputRowParser(
- new JSONParseSpec(
- new TimestampSpec("ts", "auto", null),
- new DimensionsSpec(null, null, null),
- null,
- null,
- null
- )
- ),
- Map.class
- );
- schema = new DataSchema(
- DATASOURCE,
- parserMap,
- new AggregatorFactory[]{
- new CountAggregatorFactory("count"),
- new LongSumAggregatorFactory("met", "met")
- },
- new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
- null,
- objectMapper
- );
- tuningConfig = new IndexTask.IndexTuningConfig(
- null,
- 2,
- null,
- maxRowsInMemory,
- maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
- false,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- OffHeapMemorySegmentWriteOutMediumFactory.instance(),
- true,
- null,
- null,
- null,
- null
- ).withBasePersistDirectory(createNewBasePersistDirectory());
-
- metrics = new FireDepartmentMetrics();
-
- indexIO = new IndexIO(
- objectMapper,
- new ColumnConfig()
- {
- @Override
- public int columnCacheSizeBytes()
- {
- return 0;
- }
- }
- );
- indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
-
- emitter = new ServiceEmitter(
- "test",
- "test",
- new NoopEmitter()
- );
- emitter.start();
- EmittingLogger.registerEmitter(emitter);
- dataSegmentPusher = new DataSegmentPusher()
- {
- private boolean mustFail = true;
-
- @Deprecated
- @Override
- public String getPathForHadoop(String dataSource)
- {
- return getPathForHadoop();
- }
-
- @Override
- public String getPathForHadoop()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
- {
- if (enablePushFailure && mustFail) {
- mustFail = false;
- throw new IOException("Push failure test");
- } else if (enablePushFailure) {
- mustFail = true;
- }
- pushedSegments.add(segment);
- return segment;
- }
-
- @Override
- public Map makeLoadSpec(URI uri)
- {
- throw new UnsupportedOperationException();
- }
- };
- appenderator = Appenderators.createOffline(
- schema.getDataSource(),
- schema,
- tuningConfig,
- metrics,
- dataSegmentPusher,
- objectMapper,
- indexIO,
- indexMerger,
- rowIngestionMeters,
- new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
- batchMemoryMappedIndex
- );
- }
-
- private long getDefaultMaxBytesInMemory()
- {
- return (Runtime.getRuntime().totalMemory()) / 3;
- }
-
- public DataSchema getSchema()
- {
- return schema;
- }
-
- public IndexTask.IndexTuningConfig getTuningConfig()
- {
- return tuningConfig;
- }
-
- public FireDepartmentMetrics getMetrics()
- {
- return metrics;
- }
-
- public DataSegmentPusher getDataSegmentPusher()
- {
- return dataSegmentPusher;
- }
-
- public ObjectMapper getObjectMapper()
- {
- return objectMapper;
- }
-
- public Appenderator getAppenderator()
- {
- return appenderator;
- }
-
- public List getPushedSegments()
- {
- return pushedSegments;
- }
-
- @Override
- public void close() throws Exception
- {
- appenderator.close();
- emitter.close();
- FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
- }
-
- private static File createNewBasePersistDirectory()
- {
- return FileUtils.createTempDir("druid-batch-persist");
- }
-}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index a20575140c5..225769e23be 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -105,7 +105,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean batchMemoryMappedIndex
+ boolean useLegacyBatchProcessing
)
{
return Appenderators.createOffline(
@@ -119,7 +119,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
indexMerger,
rowIngestionMeters,
parseExceptionHandler,
- batchMemoryMappedIndex
+ useLegacyBatchProcessing
);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index e2f226a4276..60700132687 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -79,7 +79,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
+import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
import org.assertj.core.api.Assertions;
@@ -461,7 +461,7 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
protected void unlockAppenderatorBasePersistDirForTask(SeekableStreamIndexTask task)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException
{
- Method unlockBasePersistDir = ((AppenderatorImpl) task.getAppenderator())
+ Method unlockBasePersistDir = ((StreamAppenderator) task.getAppenderator())
.getClass()
.getDeclaredMethod("unlockBasePersistDirectory");
unlockBasePersistDir.setAccessible(true);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 1d7da70837a..26d6f5804fa 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -43,6 +43,12 @@ import java.util.List;
* Concurrency: all methods defined in this class directly, including {@link #close()} and {@link #closeNow()}, i. e.
* all methods of the data appending and indexing lifecycle except {@link #drop} must be called from a single thread.
* Methods inherited from {@link QuerySegmentWalker} can be called concurrently from multiple threads.
+ *
+ * Important note: For historical reasons there was a single implementation for this interface ({@code AppenderatorImpl})
+ * but that since has been split into two classes: {@link StreamAppenderator} and {@link BatchAppenderator}. With this change
+ * all the query support & concurrency has been removed from the {@code BatchAppenderator} therefore this class no longer
+ * makes sense to have as an {@code Appenderator}. In the future we may want to refactor away the {@code Appenderator}
+ * interface from {@code BatchAppenderator}.
*/
public interface Appenderator extends QuerySegmentWalker
{
@@ -214,15 +220,6 @@ public interface Appenderator extends QuerySegmentWalker
*/
void closeNow();
- /**
- * Flag to tell internals whether appenderator is working on behalf of a real time task.
- * This is to manage certain aspects as needed. For example, for batch, non-real time tasks,
- * physical segments (i.e. hydrants) do not need to memory map their persisted
- * files. In this case, the code will avoid memory mapping them thus ameliorating the occurance
- * of OOMs.
- */
- boolean isRealTime();
-
/**
* Result of {@link Appenderator#add} containing following information
* - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index 6a86f3eaa0d..743df6bb4eb 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -62,7 +62,7 @@ public class Appenderators
ParseExceptionHandler parseExceptionHandler
)
{
- return new AppenderatorImpl(
+ return new StreamAppenderator(
id,
schema,
config,
@@ -88,8 +88,7 @@ public class Appenderators
indexMerger,
cache,
rowIngestionMeters,
- parseExceptionHandler,
- true
+ parseExceptionHandler
);
}
@@ -104,24 +103,40 @@ public class Appenderators
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean batchMemoryMappedIndex
+ boolean useLegacyBatchProcessing
)
{
- return new AppenderatorImpl(
+ if (useLegacyBatchProcessing) {
+ // fallback to code known to be working, this is just a fallback option in case new
+ // batch appenderator has some early bugs but we will remove this fallback as soon as
+ // we determine that batch appenderator code is stable
+ return new StreamAppenderator(
+ id,
+ schema,
+ config,
+ metrics,
+ dataSegmentPusher,
+ objectMapper,
+ new NoopDataSegmentAnnouncer(),
+ null,
+ indexIO,
+ indexMerger,
+ null,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+ }
+ return new BatchAppenderator(
id,
schema,
config,
metrics,
dataSegmentPusher,
objectMapper,
- new NoopDataSegmentAnnouncer(),
- null,
indexIO,
indexMerger,
- null,
rowIngestionMeters,
- parseExceptionHandler,
- batchMemoryMappedIndex // This is a task config (default false) to fallback to "old" code in case of bug with the new memory optimization code
+ parseExceptionHandler
);
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 0bbdd402e98..3e25bc5b097 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -98,7 +98,7 @@ public interface AppenderatorsManager
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean batchMemoryMappedIndex
+ boolean useLegacyBatchProcessing
);
/**
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 1a96f4e13ad..bdd572cc1af 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -172,7 +172,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
/**
* Allocated segments for a sequence
*/
- static class SegmentsForSequence
+ public static class SegmentsForSequence
{
// Interval Start millis -> List of Segments for this interval
// there might be multiple segments for a start interval, for example one segment
@@ -215,7 +215,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
return intervalToSegmentStates.get(timestamp);
}
- Stream allSegmentStateStream()
+ public Stream allSegmentStateStream()
{
return intervalToSegmentStates
.values()
@@ -261,7 +261,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
}
@VisibleForTesting
- Map getSegments()
+ public Map getSegments()
{
return segments;
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
new file mode 100644
index 00000000000..be5cadd0d5b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -0,0 +1,1147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.FireHydrant;
+import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * This is a new class produced when the old {@code AppenderatorImpl} was split. For historical
+ * reasons, the code for creating segments was all handled by the same code path in that class. The code
+ * was correct but inefficient for batch ingestion from a memory perspective. If the input file being processed
+ * by batch ingestion had enough sinks & hydrants produced then it may run out of memory either in the
+ * hydrant creation phase (append) of this class or in the merge hydrants phase. Therefore a new class,
+ * {@code BatchAppenderator}, this class, was created to specialize in batch ingestion and the old class
+ * for stream ingestion was renamed to {@link StreamAppenderator}.
+ *
+ * This class is not thread safe!.
+ * It is important to realize that this class is completely synchronous despite the {@link Appenderator}
+ * interface suggesting otherwise. The concurrency was not required so it has been completely removed.
+ */
+@NotThreadSafe
+public class BatchAppenderator implements Appenderator
+{
+ public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
+ // Rough estimate of memory footprint of empty FireHydrant based on actual heap dumps
+ public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
+
+ private static final EmittingLogger log = new EmittingLogger(BatchAppenderator.class);
+ private static final String IDENTIFIER_FILE_NAME = "identifier.json";
+
+ private final String myId;
+ private final DataSchema schema;
+ private final AppenderatorConfig tuningConfig;
+ private final FireDepartmentMetrics metrics;
+ private final DataSegmentPusher dataSegmentPusher;
+ private final ObjectMapper objectMapper;
+ private final IndexIO indexIO;
+ private final IndexMerger indexMerger;
+ private final Map sinks = new HashMap<>();
+ private final long maxBytesTuningConfig;
+ private final boolean skipBytesInMemoryOverheadCheck;
+
+ /**
+ * The following sinks metadata map and associated class are the way to retain metadata now that sinks
+ * are being completely removed from memory after each incremental persist.
+ */
+ private final Map sinksMetadata = new HashMap<>();
+
+ // This variable updated in add(), persist(), and drop()
+ private int rowsCurrentlyInMemory = 0;
+ private int totalRows = 0;
+ private long bytesCurrentlyInMemory = 0;
+ private final RowIngestionMeters rowIngestionMeters;
+ private final ParseExceptionHandler parseExceptionHandler;
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private volatile FileLock basePersistDirLock = null;
+ private volatile FileChannel basePersistDirLockChannel = null;
+
+ BatchAppenderator(
+ String id,
+ DataSchema schema,
+ AppenderatorConfig tuningConfig,
+ FireDepartmentMetrics metrics,
+ DataSegmentPusher dataSegmentPusher,
+ ObjectMapper objectMapper,
+ IndexIO indexIO,
+ IndexMerger indexMerger,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
+ )
+ {
+ this.myId = id;
+ this.schema = Preconditions.checkNotNull(schema, "schema");
+ this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
+ this.metrics = Preconditions.checkNotNull(metrics, "metrics");
+ this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher");
+ this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
+ this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO");
+ this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
+ this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
+ this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
+
+ maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
+ skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck();
+ }
+
+ @Override
+ public String getId()
+ {
+ return myId;
+ }
+
+ @Override
+ public String getDataSource()
+ {
+ return schema.getDataSource();
+ }
+
+ @Override
+ public Object startJob()
+ {
+ tuningConfig.getBasePersistDirectory().mkdirs();
+ lockBasePersistDirectory();
+ return null;
+ }
+
+ @Override
+ public AppenderatorAddResult add(
+ final SegmentIdWithShardSpec identifier,
+ final InputRow row,
+ @Nullable final Supplier committerSupplier,
+ final boolean allowIncrementalPersists
+ ) throws IndexSizeExceededException, SegmentNotWritableException
+ {
+
+ Preconditions.checkArgument(
+ committerSupplier == null,
+ "Batch appenderator does not need a committer!"
+ );
+
+ Preconditions.checkArgument(
+ allowIncrementalPersists,
+ "Batch appenderator should always allow incremental persists!"
+ );
+
+ if (!identifier.getDataSource().equals(schema.getDataSource())) {
+ throw new IAE(
+ "Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!",
+ schema.getDataSource(),
+ identifier.getDataSource()
+ );
+ }
+
+ final Sink sink = getOrCreateSink(identifier);
+ metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
+ final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
+ final int sinkRowsInMemoryAfterAdd;
+ final long bytesInMemoryBeforeAdd = sink.getBytesInMemory();
+ final long bytesInMemoryAfterAdd;
+ final IncrementalIndexAddResult addResult;
+
+ try {
+ addResult = sink.add(row, false); // allow incrememtal persis is always true for batch
+ sinkRowsInMemoryAfterAdd = addResult.getRowCount();
+ bytesInMemoryAfterAdd = addResult.getBytesInMemory();
+ }
+ catch (IndexSizeExceededException e) {
+ // Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we
+ // can't add the row (it just failed). This should never actually happen, though, because we check
+ // sink.canAddRow after returning from add.
+ log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier);
+ throw e;
+ }
+
+ if (sinkRowsInMemoryAfterAdd < 0) {
+ throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
+ }
+
+ if (addResult.isRowAdded()) {
+ rowIngestionMeters.incrementProcessed();
+ } else if (addResult.hasParseException()) {
+ parseExceptionHandler.handle(addResult.getParseException());
+ }
+
+ final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
+ rowsCurrentlyInMemory += numAddedRows;
+ bytesCurrentlyInMemory += bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd;
+ totalRows += numAddedRows;
+ sinksMetadata.computeIfAbsent(identifier, unused -> new SinkMetadata()).addRows(numAddedRows);
+
+ boolean persist = false;
+ List persistReasons = new ArrayList<>();
+
+ if (!sink.canAppendRow()) {
+ persist = true;
+ persistReasons.add("No more rows can be appended to sink");
+ }
+ if (rowsCurrentlyInMemory >= tuningConfig.getMaxRowsInMemory()) {
+ persist = true;
+ persistReasons.add(StringUtils.format(
+ "rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]",
+ rowsCurrentlyInMemory,
+ tuningConfig.getMaxRowsInMemory()
+ ));
+ }
+ if (bytesCurrentlyInMemory >= maxBytesTuningConfig) {
+ persist = true;
+ persistReasons.add(StringUtils.format(
+ "bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]",
+ bytesCurrentlyInMemory,
+ maxBytesTuningConfig
+ ));
+ }
+ if (persist) {
+ // persistAll clears rowsCurrentlyInMemory, no need to update it.
+ log.info("Incremental persist to disk because %s.", String.join(",", persistReasons));
+
+ long bytesToBePersisted = 0L;
+ for (Map.Entry entry : sinks.entrySet()) {
+ final Sink sinkEntry = entry.getValue();
+ if (sinkEntry != null) {
+ bytesToBePersisted += sinkEntry.getBytesInMemory();
+ if (sinkEntry.swappable()) {
+ // Code for batch no longer memory maps hydrants but they still take memory...
+ int memoryStillInUse = calculateMemoryUsedByHydrant();
+ bytesCurrentlyInMemory += memoryStillInUse;
+ }
+ }
+ }
+
+ if (!skipBytesInMemoryOverheadCheck
+ && bytesCurrentlyInMemory - bytesToBePersisted > maxBytesTuningConfig) {
+ // We are still over maxBytesTuningConfig even after persisting.
+ // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
+ final String alertMessage = StringUtils.format(
+ "Task has exceeded safe estimated heap usage limits, failing "
+ + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])"
+ + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])",
+ sinks.size(),
+ sinks.values().stream().mapToInt(Iterables::size).sum(),
+ getTotalRowCount(),
+ bytesCurrentlyInMemory,
+ bytesToBePersisted,
+ maxBytesTuningConfig
+ );
+ final String errorMessage = StringUtils.format(
+ "%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to "
+ + "great to have enough space to process additional input rows. This check, along with metering the overhead "
+ + "of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting "
+ + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter "
+ + "a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an "
+ + "increase in heap footprint, but will allow for more intermediary segment persists to occur before "
+ + "reaching this condition.",
+ alertMessage
+ );
+ log.makeAlert(alertMessage)
+ .addData("dataSource", schema.getDataSource())
+ .emit();
+ throw new RuntimeException(errorMessage);
+ }
+
+ persistAllAndRemoveSinks();
+
+ }
+ return new AppenderatorAddResult(identifier, sinksMetadata.get(identifier).numRowsInSegment, false);
+ }
+
+ @Override
+ /**
+ * Returns all active segments regardless whether they are in memory or persisted
+ */
+ public List getSegments()
+ {
+ return ImmutableList.copyOf(sinksMetadata.keySet());
+ }
+
+ @VisibleForTesting
+ public List getInMemorySegments()
+ {
+ return ImmutableList.copyOf(sinks.keySet());
+ }
+
+ @Override
+ public int getRowCount(final SegmentIdWithShardSpec identifier)
+ {
+ return sinksMetadata.get(identifier).getNumRowsInSegment();
+ }
+
+ @Override
+ public int getTotalRowCount()
+ {
+ return totalRows;
+ }
+
+ @VisibleForTesting
+ public int getRowsInMemory()
+ {
+ return rowsCurrentlyInMemory;
+ }
+
+ @VisibleForTesting
+ public long getBytesCurrentlyInMemory()
+ {
+ return bytesCurrentlyInMemory;
+ }
+
+ @VisibleForTesting
+ public long getBytesInMemory(SegmentIdWithShardSpec identifier)
+ {
+ final Sink sink = sinks.get(identifier);
+
+ if (sink == null) {
+ return 0L; // sinks are removed after a persist
+ } else {
+ return sink.getBytesInMemory();
+ }
+ }
+
+ private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier)
+ {
+ Sink retVal = sinks.get(identifier);
+
+ if (retVal == null) {
+ retVal = new Sink(
+ identifier.getInterval(),
+ schema,
+ identifier.getShardSpec(),
+ identifier.getVersion(),
+ tuningConfig.getAppendableIndexSpec(),
+ tuningConfig.getMaxRowsInMemory(),
+ maxBytesTuningConfig,
+ null
+ );
+ bytesCurrentlyInMemory += calculateSinkMemoryInUsed();
+
+ sinks.put(identifier, retVal);
+ metrics.setSinkCount(sinks.size());
+ }
+
+ return retVal;
+ }
+
+ @Override
+ public QueryRunner getQueryRunnerForIntervals(final Query query, final Iterable intervals)
+ {
+ throw new UnsupportedOperationException("No query runner for batch appenderator");
+ }
+
+ @Override
+ public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs)
+ {
+ throw new UnsupportedOperationException("No query runner for batch appenderator");
+ }
+
+ @Override
+ public void clear()
+ {
+ clear(true);
+ }
+
+ private void clear(boolean removeOnDiskData)
+ {
+ // Drop commit metadata, then abandon all segments.
+ log.info("Clearing all[%d] sinks & their hydrants, removing data on disk: [%s]", sinks.size(), removeOnDiskData);
+ // Drop everything.
+ Iterator> sinksIterator = sinks.entrySet().iterator();
+ sinksIterator.forEachRemaining(entry -> {
+ clearSinkMetadata(entry.getKey(), entry.getValue(), removeOnDiskData);
+ sinksIterator.remove();
+ });
+ metrics.setSinkCount(sinks.size());
+ }
+
+ @Override
+ public ListenableFuture> drop(final SegmentIdWithShardSpec identifier)
+ {
+ final Sink sink = sinks.get(identifier);
+ SinkMetadata sm = sinksMetadata.remove(identifier);
+ if (sm != null) {
+ int originalTotalRows = getTotalRowCount();
+ int rowsToDrop = sm.getNumRowsInSegment();
+ int totalRowsAfter = originalTotalRows - rowsToDrop;
+ if (totalRowsAfter < 0) {
+ log.warn("Total rows[%d] after dropping segment[%s] rows [%d]", totalRowsAfter, identifier, rowsToDrop);
+ }
+ totalRows = Math.max(totalRowsAfter, 0);
+ }
+ if (sink != null) {
+ clearSinkMetadata(identifier, sink, true);
+ if (sinks.remove(identifier) == null) {
+ log.warn("Sink for identifier[%s] not found, skipping", identifier);
+ }
+ }
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public ListenableFuture