Bound memory utilization for dynamic partitioning (i.e. memory growth is constant) (#11294)

* Bound memory in native batch ingest create segments

* Move BatchAppenderatorDriverTest to indexing service... note that we had to put the sink back in sinks in mergeandpush since the persistent data needs to be dropped and the sink is required for that

* Remove sinks from memory and clean up intermediate persists dirs manually after sink has been merged

* Changed name from RealtimeAppenderator to StreamAppenderator

* Style

* Incorporating tests from StreamAppenderatorTest

* Keep totalRows and cleanup code

* Added missing dep

* Fix unit test

* Checkstyle

* allowIncrementalPersists should always be true for batch

* Added sinks metadata

* clear sinks metadata when closing appenderator

* Style + minor edits to log msgs

* Update sinks metadata & totalRows when dropping a sink (segment)

* Remove max

* Intelli-j check

* Keep a count of hydrants persisted by sink for sanity check before merge

* Move out sanity

* Add previous hydrant count to sink metadata

* Remove redundant field from SinkMetadata

* Remove unneeded functions

* Cleanup unused code

* Removed unused code

* Remove unused field

* Exclude it from jacoco because it is very hard to get branch coverage

* Remove segment announcement and some other minor cleanup

* Add fallback flag

* Minor code cleanup

* Checkstyle

* Code review changes

* Update batchMemoryMappedIndex name

* Code review comments

* Exclude class from coverage, will include again when packaging gets fixed

* Moved test classes to server module

* More BatchAppenderator cleanup

* Fix bug in wrong counting of totalHydrants plus minor cleanup in add

* Removed left over comments

* Have BatchAppenderator follow the Appenderator contract for push & getSegments

* Fix LGTM violations

* Review comments

* Add stats after push is done

* Code review comments (cleanup, remove rest of synchronization constructs in batch appenderator, reneame feature flag,
remove real time flag stuff from stream appenderator, etc.)

* Update javadocs

* Add thread safety notice to BatchAppenderator

* Further cleanup config

* More config cleanup
This commit is contained in:
Agustin Gonzalez 2021-07-09 00:10:29 -07:00 committed by GitHub
parent 4c90c0c21d
commit 7e61042794
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 2823 additions and 816 deletions

View File

@ -1334,7 +1334,7 @@ Additional peon configs include:
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote| |`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`| |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`| |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`| |`druid.indexer.task.useLegacyBatchProcessing`|If false, native batch ingestion will use a new, recommended, code path with memory optimized code for the segment creation phase. If true it will use the previous code path for the create segments phase of batch ingestion. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new optimized batch ingestion code. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|

View File

@ -62,11 +62,6 @@
<artifactId>druid-hll</artifactId> <artifactId>druid-hll</artifactId>
<version>${project.parent.version}</version> <version>${project.parent.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
<dependency> <dependency>
<groupId>io.dropwizard.metrics</groupId> <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId> <artifactId>metrics-core</artifactId>
@ -232,7 +227,11 @@
<artifactId>jackson-core-asl</artifactId> <artifactId>jackson-core-asl</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@ -77,7 +77,7 @@ public class TaskConfig
private final boolean ignoreTimestampSpecForDruidInputSource; private final boolean ignoreTimestampSpecForDruidInputSource;
@JsonProperty @JsonProperty
private final boolean batchMemoryMappedIndex; private final boolean useLegacyBatchProcessing;
@JsonCreator @JsonCreator
public TaskConfig( public TaskConfig(
@ -91,7 +91,7 @@ public class TaskConfig
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout, @JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations, @JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource, @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
@JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior @JsonProperty("useLegacyBatchProcessing") boolean useLegacyBatchProcessing // only set to true to fall back to older behavior
) )
{ {
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir; this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@ -117,7 +117,7 @@ public class TaskConfig
this.shuffleDataLocations = shuffleDataLocations; this.shuffleDataLocations = shuffleDataLocations;
} }
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource; this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
this.batchMemoryMappedIndex = batchMemoryMapIndex; this.useLegacyBatchProcessing = useLegacyBatchProcessing;
} }
@JsonProperty @JsonProperty
@ -201,9 +201,9 @@ public class TaskConfig
} }
@JsonProperty @JsonProperty
public boolean getBatchMemoryMappedIndex() public boolean getuseLegacyBatchProcessing()
{ {
return batchMemoryMappedIndex; return useLegacyBatchProcessing;
} }

View File

@ -81,7 +81,7 @@ public final class BatchAppenderators
toolbox.getIndexMergerV9(), toolbox.getIndexMergerV9(),
rowIngestionMeters, rowIngestionMeters,
parseExceptionHandler, parseExceptionHandler,
toolbox.getConfig().getBatchMemoryMappedIndex() toolbox.getConfig().getuseLegacyBatchProcessing()
); );
} }

View File

@ -1,233 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.appenderator;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorTester;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
public class BatchAppenderatorTest extends InitializedNullHandlingTest
{
private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
createSegmentId("2000/2001", "A", 0),
createSegmentId("2000/2001", "A", 1),
createSegmentId("2001/2002", "A", 0)
);
@Test
public void testSimpleIngestionWithIndexesNotMapped() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
false,
false)) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;
// startJob
Assert.assertEquals(null, appenderator.startJob());
// getDataSource
Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
.getNumRowsInSegment()
);
Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
.getNumRowsInSegment()
);
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(IDENTIFIERS.subList(0, 2),
appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
// getRowCount
Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
thrown = false;
try {
appenderator.getRowCount(IDENTIFIERS.get(2));
}
catch (IllegalStateException e) {
thrown = true;
}
Assert.assertTrue(thrown);
// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
new Function<DataSegment, SegmentIdWithShardSpec>()
{
@Override
public SegmentIdWithShardSpec apply(DataSegment input)
{
return SegmentIdWithShardSpec.fromDataSegment(input);
}
}
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
appenderator.clear();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}
@Test
public void testSimpleIngestionWithIndexesMapped() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
false,
true)) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;
// startJob
Assert.assertEquals(null, appenderator.startJob());
// getDataSource
Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
.getNumRowsInSegment()
);
Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
.getNumRowsInSegment()
);
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(IDENTIFIERS.subList(0, 2),
appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
// getRowCount
Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
thrown = false;
try {
appenderator.getRowCount(IDENTIFIERS.get(2));
}
catch (IllegalStateException e) {
thrown = true;
}
Assert.assertTrue(thrown);
// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
new Function<DataSegment, SegmentIdWithShardSpec>()
{
@Override
public SegmentIdWithShardSpec apply(DataSegment input)
{
return SegmentIdWithShardSpec.fromDataSegment(input);
}
}
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
appenderator.clear();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}
private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
{
return new SegmentIdWithShardSpec(
AppenderatorTester.DATASOURCE,
Intervals.of(interval),
version,
new LinearShardSpec(partitionNum)
);
}
static InputRow createInputRow(String ts, String dim, Object met)
{
return new MapBasedInputRow(
DateTimes.of(ts).getMillis(),
ImmutableList.of("dim"),
ImmutableMap.of(
"dim",
dim,
"met",
met
)
);
}
}

View File

@ -1,293 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
public class BatchAppenderatorTester implements AutoCloseable
{
public static final String DATASOURCE = "foo";
private final DataSchema schema;
private final IndexTask.IndexTuningConfig tuningConfig;
private final FireDepartmentMetrics metrics;
private final DataSegmentPusher dataSegmentPusher;
private final ObjectMapper objectMapper;
private final Appenderator appenderator;
private final IndexIO indexIO;
private final IndexMerger indexMerger;
private final ServiceEmitter emitter;
private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
public BatchAppenderatorTester(
final int maxRowsInMemory,
final boolean enablePushFailure,
boolean batchMemoryMappedIndex
)
{
this(maxRowsInMemory, -1, null, enablePushFailure, batchMemoryMappedIndex);
}
public BatchAppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
final File basePersistDirectory,
final boolean enablePushFailure,
boolean batchMemoryMappedIndex
)
{
this(
maxRowsInMemory,
maxSizeInBytes,
basePersistDirectory,
enablePushFailure,
new SimpleRowIngestionMeters(),
false,
batchMemoryMappedIndex
);
}
public BatchAppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
final File basePersistDirectory,
final boolean enablePushFailure,
final RowIngestionMeters rowIngestionMeters,
final boolean skipBytesInMemoryOverheadCheck,
boolean batchMemoryMappedIndex
)
{
objectMapper = new DefaultObjectMapper();
objectMapper.registerSubtypes(LinearShardSpec.class);
final Map<String, Object> parserMap = objectMapper.convertValue(
new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(null, null, null),
null,
null,
null
)
),
Map.class
);
schema = new DataSchema(
DATASOURCE,
parserMap,
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("met", "met")
},
new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
null,
objectMapper
);
tuningConfig = new IndexTask.IndexTuningConfig(
null,
2,
null,
maxRowsInMemory,
maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
true,
null,
null,
null,
null
).withBasePersistDirectory(createNewBasePersistDirectory());
metrics = new FireDepartmentMetrics();
indexIO = new IndexIO(
objectMapper,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
emitter = new ServiceEmitter(
"test",
"test",
new NoopEmitter()
);
emitter.start();
EmittingLogger.registerEmitter(emitter);
dataSegmentPusher = new DataSegmentPusher()
{
private boolean mustFail = true;
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{
throw new UnsupportedOperationException();
}
@Override
public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
{
if (enablePushFailure && mustFail) {
mustFail = false;
throw new IOException("Push failure test");
} else if (enablePushFailure) {
mustFail = true;
}
pushedSegments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
};
appenderator = Appenderators.createOffline(
schema.getDataSource(),
schema,
tuningConfig,
metrics,
dataSegmentPusher,
objectMapper,
indexIO,
indexMerger,
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
batchMemoryMappedIndex
);
}
private long getDefaultMaxBytesInMemory()
{
return (Runtime.getRuntime().totalMemory()) / 3;
}
public DataSchema getSchema()
{
return schema;
}
public IndexTask.IndexTuningConfig getTuningConfig()
{
return tuningConfig;
}
public FireDepartmentMetrics getMetrics()
{
return metrics;
}
public DataSegmentPusher getDataSegmentPusher()
{
return dataSegmentPusher;
}
public ObjectMapper getObjectMapper()
{
return objectMapper;
}
public Appenderator getAppenderator()
{
return appenderator;
}
public List<DataSegment> getPushedSegments()
{
return pushedSegments;
}
@Override
public void close() throws Exception
{
appenderator.close();
emitter.close();
FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
}
private static File createNewBasePersistDirectory()
{
return FileUtils.createTempDir("druid-batch-persist");
}
}

View File

@ -105,7 +105,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
IndexMerger indexMerger, IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters, RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler, ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex boolean useLegacyBatchProcessing
) )
{ {
return Appenderators.createOffline( return Appenderators.createOffline(
@ -119,7 +119,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
indexMerger, indexMerger,
rowIngestionMeters, rowIngestionMeters,
parseExceptionHandler, parseExceptionHandler,
batchMemoryMappedIndex useLegacyBatchProcessing
); );
} }

View File

@ -79,7 +79,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.DictionaryEncodedColumn; import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl; import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
@ -461,7 +461,7 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
protected void unlockAppenderatorBasePersistDirForTask(SeekableStreamIndexTask task) protected void unlockAppenderatorBasePersistDirForTask(SeekableStreamIndexTask task)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException throws NoSuchMethodException, InvocationTargetException, IllegalAccessException
{ {
Method unlockBasePersistDir = ((AppenderatorImpl) task.getAppenderator()) Method unlockBasePersistDir = ((StreamAppenderator) task.getAppenderator())
.getClass() .getClass()
.getDeclaredMethod("unlockBasePersistDirectory"); .getDeclaredMethod("unlockBasePersistDirectory");
unlockBasePersistDir.setAccessible(true); unlockBasePersistDir.setAccessible(true);

View File

@ -43,6 +43,12 @@ import java.util.List;
* Concurrency: all methods defined in this class directly, including {@link #close()} and {@link #closeNow()}, i. e. * Concurrency: all methods defined in this class directly, including {@link #close()} and {@link #closeNow()}, i. e.
* all methods of the data appending and indexing lifecycle except {@link #drop} must be called from a single thread. * all methods of the data appending and indexing lifecycle except {@link #drop} must be called from a single thread.
* Methods inherited from {@link QuerySegmentWalker} can be called concurrently from multiple threads. * Methods inherited from {@link QuerySegmentWalker} can be called concurrently from multiple threads.
*<p>
* Important note: For historical reasons there was a single implementation for this interface ({@code AppenderatorImpl})
* but that since has been split into two classes: {@link StreamAppenderator} and {@link BatchAppenderator}. With this change
* all the query support & concurrency has been removed from the {@code BatchAppenderator} therefore this class no longer
* makes sense to have as an {@code Appenderator}. In the future we may want to refactor away the {@code Appenderator}
* interface from {@code BatchAppenderator}.
*/ */
public interface Appenderator extends QuerySegmentWalker public interface Appenderator extends QuerySegmentWalker
{ {
@ -214,15 +220,6 @@ public interface Appenderator extends QuerySegmentWalker
*/ */
void closeNow(); void closeNow();
/**
* Flag to tell internals whether appenderator is working on behalf of a real time task.
* This is to manage certain aspects as needed. For example, for batch, non-real time tasks,
* physical segments (i.e. hydrants) do not need to memory map their persisted
* files. In this case, the code will avoid memory mapping them thus ameliorating the occurance
* of OOMs.
*/
boolean isRealTime();
/** /**
* Result of {@link Appenderator#add} containing following information * Result of {@link Appenderator#add} containing following information
* - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added * - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added

View File

@ -62,7 +62,7 @@ public class Appenderators
ParseExceptionHandler parseExceptionHandler ParseExceptionHandler parseExceptionHandler
) )
{ {
return new AppenderatorImpl( return new StreamAppenderator(
id, id,
schema, schema,
config, config,
@ -88,8 +88,7 @@ public class Appenderators
indexMerger, indexMerger,
cache, cache,
rowIngestionMeters, rowIngestionMeters,
parseExceptionHandler, parseExceptionHandler
true
); );
} }
@ -104,24 +103,40 @@ public class Appenderators
IndexMerger indexMerger, IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters, RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler, ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex boolean useLegacyBatchProcessing
) )
{ {
return new AppenderatorImpl( if (useLegacyBatchProcessing) {
// fallback to code known to be working, this is just a fallback option in case new
// batch appenderator has some early bugs but we will remove this fallback as soon as
// we determine that batch appenderator code is stable
return new StreamAppenderator(
id,
schema,
config,
metrics,
dataSegmentPusher,
objectMapper,
new NoopDataSegmentAnnouncer(),
null,
indexIO,
indexMerger,
null,
rowIngestionMeters,
parseExceptionHandler
);
}
return new BatchAppenderator(
id, id,
schema, schema,
config, config,
metrics, metrics,
dataSegmentPusher, dataSegmentPusher,
objectMapper, objectMapper,
new NoopDataSegmentAnnouncer(),
null,
indexIO, indexIO,
indexMerger, indexMerger,
null,
rowIngestionMeters, rowIngestionMeters,
parseExceptionHandler, parseExceptionHandler
batchMemoryMappedIndex // This is a task config (default false) to fallback to "old" code in case of bug with the new memory optimization code
); );
} }
} }

View File

@ -98,7 +98,7 @@ public interface AppenderatorsManager
IndexMerger indexMerger, IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters, RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler, ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex boolean useLegacyBatchProcessing
); );
/** /**

View File

@ -172,7 +172,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
/** /**
* Allocated segments for a sequence * Allocated segments for a sequence
*/ */
static class SegmentsForSequence public static class SegmentsForSequence
{ {
// Interval Start millis -> List of Segments for this interval // Interval Start millis -> List of Segments for this interval
// there might be multiple segments for a start interval, for example one segment // there might be multiple segments for a start interval, for example one segment
@ -215,7 +215,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
return intervalToSegmentStates.get(timestamp); return intervalToSegmentStates.get(timestamp);
} }
Stream<SegmentWithState> allSegmentStateStream() public Stream<SegmentWithState> allSegmentStateStream()
{ {
return intervalToSegmentStates return intervalToSegmentStates
.values() .values()
@ -261,7 +261,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
} }
@VisibleForTesting @VisibleForTesting
Map<String, SegmentsForSequence> getSegments() public Map<String, SegmentsForSequence> getSegments()
{ {
return segments; return segments;
} }

View File

@ -90,7 +90,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
IndexMerger indexMerger, IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters, RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler, ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex boolean useLegacyBatchProcessing
) )
{ {
throw new UOE(ERROR_MSG); throw new UOE(ERROR_MSG);

View File

@ -122,7 +122,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
IndexMerger indexMerger, IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters, RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler, ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex boolean useLegacyBatchProcessing
) )
{ {
// CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators // CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators
@ -140,7 +140,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
indexMerger, indexMerger,
rowIngestionMeters, rowIngestionMeters,
parseExceptionHandler, parseExceptionHandler,
batchMemoryMappedIndex useLegacyBatchProcessing
); );
return batchAppenderator; return batchAppenderator;
} }

View File

@ -60,7 +60,6 @@ import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionHandler;
@ -72,7 +71,6 @@ import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -86,9 +84,7 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -104,7 +100,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class AppenderatorImpl implements Appenderator public class StreamAppenderator implements Appenderator
{ {
// Rough estimate of memory footprint of a ColumnHolder based on actual heap dumps // Rough estimate of memory footprint of a ColumnHolder based on actual heap dumps
public static final int ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER = 1000; public static final int ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER = 1000;
@ -115,7 +111,7 @@ public class AppenderatorImpl implements Appenderator
// Rough estimate of memory footprint of empty FireHydrant based on actual heap dumps // Rough estimate of memory footprint of empty FireHydrant based on actual heap dumps
public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000; public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
private static final EmittingLogger log = new EmittingLogger(AppenderatorImpl.class); private static final EmittingLogger log = new EmittingLogger(StreamAppenderator.class);
private static final int WARN_DELAY = 1000; private static final int WARN_DELAY = 1000;
private static final String IDENTIFIER_FILE_NAME = "identifier.json"; private static final String IDENTIFIER_FILE_NAME = "identifier.json";
@ -166,19 +162,6 @@ public class AppenderatorImpl implements Appenderator
private volatile Throwable persistError; private volatile Throwable persistError;
private final boolean isRealTime;
/**
* Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator
* in order to facilitate the mapping of the QueryableIndex associated with a given hydrant
* at merge time. This is necessary since batch appenderator will not map the QueryableIndex
* at persist time in order to minimize its memory footprint. This has to be synchronized since the
* map may be accessed from multiple threads.
* Use {@link IdentityHashMap} to better reflect the fact that the key needs to be interpreted
* with reference semantics.
*/
private final Map<FireHydrant, Pair<File, SegmentId>> persistedHydrantMetadata =
Collections.synchronizedMap(new IdentityHashMap<>());
/** /**
* This constructor allows the caller to provide its own SinkQuerySegmentWalker. * This constructor allows the caller to provide its own SinkQuerySegmentWalker.
* *
@ -188,7 +171,7 @@ public class AppenderatorImpl implements Appenderator
* It is used by UnifiedIndexerAppenderatorsManager which allows queries on data associated with multiple * It is used by UnifiedIndexerAppenderatorsManager which allows queries on data associated with multiple
* Appenderators. * Appenderators.
*/ */
AppenderatorImpl( StreamAppenderator(
String id, String id,
DataSchema schema, DataSchema schema,
AppenderatorConfig tuningConfig, AppenderatorConfig tuningConfig,
@ -201,8 +184,7 @@ public class AppenderatorImpl implements Appenderator
IndexMerger indexMerger, IndexMerger indexMerger,
Cache cache, Cache cache,
RowIngestionMeters rowIngestionMeters, RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler, ParseExceptionHandler parseExceptionHandler
boolean isRealTime
) )
{ {
this.myId = id; this.myId = id;
@ -218,7 +200,6 @@ public class AppenderatorImpl implements Appenderator
this.texasRanger = sinkQuerySegmentWalker; this.texasRanger = sinkQuerySegmentWalker;
this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters"); this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler"); this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
this.isRealTime = isRealTime;
if (sinkQuerySegmentWalker == null) { if (sinkQuerySegmentWalker == null) {
this.sinkTimeline = new VersionedIntervalTimeline<>( this.sinkTimeline = new VersionedIntervalTimeline<>(
@ -555,9 +536,6 @@ public class AppenderatorImpl implements Appenderator
futures.add(abandonSegment(entry.getKey(), entry.getValue(), true)); futures.add(abandonSegment(entry.getKey(), entry.getValue(), true));
} }
// Re-initialize hydrant map:
persistedHydrantMetadata.clear();
// Await dropping. // Await dropping.
Futures.allAsList(futures).get(); Futures.allAsList(futures).get();
} }
@ -867,34 +845,6 @@ public class AppenderatorImpl implements Appenderator
Closer closer = Closer.create(); Closer closer = Closer.create();
try { try {
for (FireHydrant fireHydrant : sink) { for (FireHydrant fireHydrant : sink) {
// if batch, swap/persist did not memory map the incremental index, we need it mapped now:
if (!isRealTime()) {
// sanity
Pair<File, SegmentId> persistedMetadata = persistedHydrantMetadata.get(fireHydrant);
if (persistedMetadata == null) {
throw new ISE("Persisted metadata for batch hydrant [%s] is null!", fireHydrant);
}
File persistedFile = persistedMetadata.lhs;
SegmentId persistedSegmentId = persistedMetadata.rhs;
// sanity:
if (persistedFile == null) {
throw new ISE("Persisted file for batch hydrant [%s] is null!", fireHydrant);
} else if (persistedSegmentId == null) {
throw new ISE(
"Persisted segmentId for batch hydrant in file [%s] is null!",
persistedFile.getPath()
);
}
fireHydrant.swapSegment(new QueryableIndexSegment(
indexIO.loadIndex(persistedFile),
persistedSegmentId
));
}
Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment(); Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant); log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
@ -942,15 +892,6 @@ public class AppenderatorImpl implements Appenderator
5 5
); );
if (!isRealTime()) {
// Drop the queryable indexes behind the hydrants... they are not needed anymore and their
// mapped file references
// can generate OOMs during merge if enough of them are held back...
for (FireHydrant fireHydrant : sink) {
fireHydrant.swapSegment(null);
}
}
final long pushFinishTime = System.nanoTime(); final long pushFinishTime = System.nanoTime();
objectMapper.writeValue(descriptorFile, segment); objectMapper.writeValue(descriptorFile, segment);
@ -1077,13 +1018,6 @@ public class AppenderatorImpl implements Appenderator
} }
} }
@Override
public boolean isRealTime()
{
return isRealTime;
}
private void lockBasePersistDirectory() private void lockBasePersistDirectory()
{ {
if (basePersistDirLock == null) { if (basePersistDirLock == null) {
@ -1401,8 +1335,6 @@ public class AppenderatorImpl implements Appenderator
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
} }
hydrant.swapSegment(null); hydrant.swapSegment(null);
// remove hydrant from persisted metadata:
persistedHydrantMetadata.remove(hydrant);
} }
if (removeOnDiskData) { if (removeOnDiskData) {
@ -1517,15 +1449,10 @@ public class AppenderatorImpl implements Appenderator
numRows numRows
); );
// Map only when this appenderator is being driven by a real time task: indexToPersist.swapSegment(new QueryableIndexSegment(
Segment segmentToSwap = null; indexIO.loadIndex(persistedFile),
if (isRealTime()) { indexToPersist.getSegmentId()
segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()); ));
} else {
// remember file path & segment id to rebuild the queryable index for merge:
persistedHydrantMetadata.put(indexToPersist, new Pair<>(persistedFile, indexToPersist.getSegmentId()));
}
indexToPersist.swapSegment(segmentToSwap);
return numRows; return numRows;
} }
@ -1563,14 +1490,10 @@ public class AppenderatorImpl implements Appenderator
// These calculations are approximated from actual heap dumps. // These calculations are approximated from actual heap dumps.
// Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment, // Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment,
// Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.) // Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.)
int total; int total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT +
total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT; (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
if (isRealTime()) { (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
// for real time add references to byte memory mapped references.. ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
total += (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
(hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
}
return total; return total;
} }

View File

@ -175,7 +175,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
DatasourceBundle::new DatasourceBundle::new
); );
Appenderator appenderator = new AppenderatorImpl( Appenderator appenderator = new StreamAppenderator(
taskId, taskId,
schema, schema,
rewriteAppenderatorConfigMemoryLimits(config), rewriteAppenderatorConfigMemoryLimits(config),
@ -188,8 +188,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
wrapIndexMerger(indexMerger), wrapIndexMerger(indexMerger),
cache, cache,
rowIngestionMeters, rowIngestionMeters,
parseExceptionHandler, parseExceptionHandler
true
); );
datasourceBundle.addAppenderator(taskId, appenderator); datasourceBundle.addAppenderator(taskId, appenderator);
@ -209,7 +208,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
IndexMerger indexMerger, IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters, RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler, ParseExceptionHandler parseExceptionHandler,
boolean batchMemoryMappedIndex boolean useLegacyBatchProcessing
) )
{ {
synchronized (this) { synchronized (this) {
@ -229,7 +228,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
wrapIndexMerger(indexMerger), wrapIndexMerger(indexMerger),
rowIngestionMeters, rowIngestionMeters,
parseExceptionHandler, parseExceptionHandler,
batchMemoryMappedIndex useLegacyBatchProcessing
); );
datasourceBundle.addAppenderator(taskId, appenderator); datasourceBundle.addAppenderator(taskId, appenderator);
return appenderator; return appenderator;
@ -493,7 +492,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
/** /**
* This wrapper around IndexMerger limits concurrent calls to the merge/persist methods used by * This wrapper around IndexMerger limits concurrent calls to the merge/persist methods used by
* {@link AppenderatorImpl} with a shared executor service. Merge/persist methods that are not used by * {@link StreamAppenderator} with a shared executor service. Merge/persist methods that are not used by
* AppenderatorImpl will throw an exception if called. * AppenderatorImpl will throw an exception if called.
*/ */
public static class LimitedPoolIndexMerger implements IndexMerger public static class LimitedPoolIndexMerger implements IndexMerger

View File

@ -34,11 +34,11 @@ import org.junit.Test;
public class AppenderatorPlumberTest public class AppenderatorPlumberTest
{ {
private final AppenderatorPlumber plumber; private final AppenderatorPlumber plumber;
private final AppenderatorTester appenderatorTester; private final StreamAppenderatorTester streamAppenderatorTester;
public AppenderatorPlumberTest() throws Exception public AppenderatorPlumberTest() throws Exception
{ {
this.appenderatorTester = new AppenderatorTester(10); this.streamAppenderatorTester = new StreamAppenderatorTester(10);
DataSegmentAnnouncer segmentAnnouncer = EasyMock DataSegmentAnnouncer segmentAnnouncer = EasyMock
.createMock(DataSegmentAnnouncer.class); .createMock(DataSegmentAnnouncer.class);
segmentAnnouncer.announceSegment(EasyMock.anyObject()); segmentAnnouncer.announceSegment(EasyMock.anyObject());
@ -84,26 +84,27 @@ public class AppenderatorPlumberTest
null null
); );
this.plumber = new AppenderatorPlumber(appenderatorTester.getSchema(), this.plumber = new AppenderatorPlumber(streamAppenderatorTester.getSchema(),
tuningConfig, appenderatorTester.getMetrics(), tuningConfig, streamAppenderatorTester.getMetrics(),
segmentAnnouncer, segmentPublisher, handoffNotifier, segmentAnnouncer, segmentPublisher, handoffNotifier,
appenderatorTester.getAppenderator()); streamAppenderatorTester.getAppenderator());
} }
@Test @Test
public void testSimpleIngestion() throws Exception public void testSimpleIngestion() throws Exception
{ {
Appenderator appenderator = appenderatorTester.getAppenderator(); Appenderator appenderator = streamAppenderatorTester.getAppenderator();
// startJob // startJob
Assert.assertEquals(null, plumber.startJob()); Assert.assertEquals(null, plumber.startJob());
// getDataSource // getDataSource
Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource()); Assert.assertEquals(StreamAppenderatorTester.DATASOURCE, appenderator.getDataSource());
InputRow[] rows = new InputRow[] {AppenderatorTest.ir("2000", "foo", 1), InputRow[] rows = new InputRow[] {
AppenderatorTest.ir("2000", "bar", 2), AppenderatorTest.ir("2000", "qux", 4)}; StreamAppenderatorTest.ir("2000", "foo", 1),
StreamAppenderatorTest.ir("2000", "bar", 2), StreamAppenderatorTest.ir("2000", "qux", 4)};
// add // add
Assert.assertEquals(1, plumber.add(rows[0], null).getRowCount()); Assert.assertEquals(1, plumber.add(rows[0], null).getRowCount());

View File

@ -29,21 +29,25 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence; import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.easymock.EasyMockSupport; import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -73,7 +77,7 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport
); );
private SegmentAllocator allocator; private SegmentAllocator allocator;
private AppenderatorTester appenderatorTester; private BatchAppenderatorTester appenderatorTester;
private BatchAppenderatorDriver driver; private BatchAppenderatorDriver driver;
private DataSegmentKiller dataSegmentKiller; private DataSegmentKiller dataSegmentKiller;
@ -84,13 +88,13 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport
@Before @Before
public void setup() public void setup()
{ {
appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); appenderatorTester = new BatchAppenderatorTester(MAX_ROWS_IN_MEMORY);
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
dataSegmentKiller = createStrictMock(DataSegmentKiller.class); dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
driver = new BatchAppenderatorDriver( driver = new BatchAppenderatorDriver(
appenderatorTester.getAppenderator(), appenderatorTester.getAppenderator(),
allocator, allocator,
new TestUsedSegmentChecker(appenderatorTester), new TestUsedSegmentChecker(appenderatorTester.getPushedSegments()),
dataSegmentKiller dataSegmentKiller
); );
@ -199,4 +203,38 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport
{ {
return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of());
} }
static class TestSegmentAllocator implements SegmentAllocator
{
private final String dataSource;
private final Granularity granularity;
private final Map<Long, AtomicInteger> counters = new HashMap<>();
public TestSegmentAllocator(String dataSource, Granularity granularity)
{
this.dataSource = dataSource;
this.granularity = granularity;
}
@Override
public SegmentIdWithShardSpec allocate(
final InputRow row,
final String sequenceName,
final String previousSegmentId,
final boolean skipSegmentLineageCheck
)
{
DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp());
final long timestampTruncated = dateTimeTruncated.getMillis();
counters.putIfAbsent(timestampTruncated, new AtomicInteger());
final int partitionNum = counters.get(timestampTruncated).getAndIncrement();
return new SegmentIdWithShardSpec(
dataSource,
granularity.bucket(dateTimeTruncated),
VERSION,
new NumberedShardSpec(partitionNum, 0)
);
}
}
} }

View File

@ -0,0 +1,918 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.appenderator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class BatchAppenderatorTest extends InitializedNullHandlingTest
{
private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
createSegmentId("2000/2001", "A", 0), // should be in seg_0
createSegmentId("2000/2001", "A", 1), // seg_1
createSegmentId("2001/2002", "A", 0) // seg 2
);
@Test
public void testSimpleIngestion() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator();
// startJob
Assert.assertNull(appenderator.startJob());
// getDataSource
Assert.assertEquals(BatchAppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add #1
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
.getNumRowsInSegment()
);
// add #2
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 2), null)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
appenderator.getSegments().stream().sorted().collect(Collectors.toList())
);
// add #3, this hits max rows in memory:
Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "sux", 1), null)
.getNumRowsInSegment()
);
// since we just added three rows and the max rows in memory is three, all the segments (sinks etc)
// above should be cleared now
Assert.assertEquals(
Collections.emptyList(),
((BatchAppenderator) appenderator).getInMemorySegments().stream().sorted().collect(Collectors.toList())
);
// add #4, this will add one more temporary segment:
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "qux", 4), null)
.getNumRowsInSegment()
);
// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
IDENTIFIERS.subList(0, 3),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
SegmentIdWithShardSpec::fromDataSegment
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(
tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
);
appenderator.close();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}
@Test
public void testSimpleIngestionWithFallbackCodePath() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(
3,
-1,
null,
true,
new SimpleRowIngestionMeters(),
true,
true
)) {
final Appenderator appenderator = tester.getAppenderator();
// startJob
Assert.assertNull(appenderator.startJob());
// getDataSource
Assert.assertEquals(BatchAppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add #1
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
.getNumRowsInSegment()
);
// add #2
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 2), null)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
appenderator.getSegments().stream().sorted().collect(Collectors.toList())
);
// add #3, this hits max rows in memory:
Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "sux", 1), null)
.getNumRowsInSegment()
);
// since we just added three rows and the max rows in memory is three BUT we are using
// the old, fallback, code path that does not remove sinks, the segments should still be there
Assert.assertEquals(
2,
appenderator.getSegments().size()
);
// add #4, this will add one more temporary segment:
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "qux", 4), null)
.getNumRowsInSegment()
);
// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
IDENTIFIERS.subList(0, 3),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
SegmentIdWithShardSpec::fromDataSegment
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(
tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
);
appenderator.close();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}
@Test
public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{
try (
final BatchAppenderatorTester tester = new BatchAppenderatorTester(
100,
1024,
null,
true,
new SimpleRowIngestionMeters(),
true,
false
)
) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
Assert.assertEquals(
182 + nullHandlingOverhead,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
Assert.assertEquals(
182 + nullHandlingOverhead,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
appenderator.close();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
}
}
@Test
public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{
try (
final BatchAppenderatorTester tester = new BatchAppenderatorTester(
100,
1024,
null,
true,
new SimpleRowIngestionMeters(),
true,
false
)
) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
Assert.assertEquals(182 + nullHandlingOverhead, ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory());
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
Assert.assertEquals(
364 + 2 * nullHandlingOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
Assert.assertEquals(2, appenderator.getSegments().size());
appenderator.close();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
}
}
@Test
public void testMaxBytesInMemory() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(100, 15000, true)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
int sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// We do multiple more adds to the same sink to cause persist.
for (int i = 0; i < 53; i++) {
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i, 1), null);
}
sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
// no sinks no hydrants after a persist so we should have zero bytes currently in memory
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// Add a single row after persisted
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1), null);
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
currentInMemoryIndexSize = 182 + nullHandlingOverhead;
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// We do multiple more adds to the same sink to cause persist.
for (int i = 0; i < 53; i++) {
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i, 1), null);
}
// currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
// so no sinks & hydrants should be in memory...
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
appenderator.close();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory());
}
}
@Test(expected = RuntimeException.class)
public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
{
try (final BatchAppenderatorTester tester =
new BatchAppenderatorTester(100, 5180, true)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
}
}
@Test
public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{
try (
final BatchAppenderatorTester tester = new BatchAppenderatorTester(
100,
10,
null,
true,
new SimpleRowIngestionMeters(),
true,
false
)
) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
// Expected 0 since we persisted after the add
Assert.assertEquals(
0,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
// Expected 0 since we persisted after the add
Assert.assertEquals(
0,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
}
}
@Test
public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception
{
try (final BatchAppenderatorTester tester =
new BatchAppenderatorTester(100, 10000, true)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
int sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// Close with row still in memory (no persist)
appenderator.close();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory());
}
}
@Test
public void testMaxBytesInMemoryInMultipleSinks() throws Exception
{
try (final BatchAppenderatorTester tester =
new BatchAppenderatorTester(1000, 28748, true)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
// next records are 182 bytes
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
int sinkSizeOverhead = 2 * BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
Assert.assertEquals(
(2 * currentInMemoryIndexSize) + sinkSizeOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// We do multiple more adds to the same sink to cause persist.
for (int i = 0; i < 49; i++) {
// these records are 186 bytes
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i, 1), null);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar_" + i, 1), null);
}
// sinks + currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
// currHydrant and the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// Add a single row after persisted to sink 0
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1), null);
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
currentInMemoryIndexSize = 182 + nullHandlingOverhead;
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
0,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
// only one sink so far:
sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// Now add a single row to sink 1
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bob", 1), null);
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
sinkSizeOverhead += BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
(2 * currentInMemoryIndexSize) + sinkSizeOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// We do multiple more adds to the both sink to cause persist.
for (int i = 0; i < 49; i++) {
// 186 bytes
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i, 1), null);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar_" + i, 1), null);
}
// currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
// 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
// persists.
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
appenderator.close();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory());
}
}
@Test
public void testIgnoreMaxBytesInMemory() throws Exception
{
try (final BatchAppenderatorTester tester =
new BatchAppenderatorTester(100, -1, true)) {
final Appenderator appenderator = tester.getAppenderator();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
//we still calculate the size even when ignoring it to make persist decision
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
Assert.assertEquals(
182 + nullHandlingOverhead,
((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
// we added two rows only and we told that maxSizeInBytes should be ignored, so it should not have been
// persisted:
int sinkSizeOverhead = 2 * BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
(364 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.close();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
}
}
@Test
public void testMaxRowsInMemory() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
// no persist since last add was for a dup record
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bat", 1), null);
// persist expected ^ (3) rows added
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "baz", 1), null);
Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 1), null);
Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1), null);
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
// persist expected ^ (3) rows added
//appenderator.persistAll(null);
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.close();
}
}
@Test
public void testAllHydrantsAreRecovered() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(1, false)) {
final Appenderator appenderator = tester.getAppenderator();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo2", 1), null);
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo3", 1), null);
// Since maxRowsInMemory is one there ought to be three hydrants stored and recovered
// just before push, internally the code has a sanity check to make sure that this works..if it does not it throws
// an exception
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
IDENTIFIERS.subList(0, 1),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
SegmentIdWithShardSpec::fromDataSegment
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.close();
}
}
@Test
public void testTotalRowsPerSegment() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Appenderator.AppenderatorAddResult addResult0 =
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(1, addResult0.getNumRowsInSegment());
Appenderator.AppenderatorAddResult addResult1 =
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(1, addResult1.getNumRowsInSegment());
addResult1 = // dup!
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(1, addResult1.getNumRowsInSegment()); // dup record does not count
// no persist since last add was for a dup record
addResult1 =
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bat", 1), null);
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(2, addResult1.getNumRowsInSegment());
// persist expected ^ (3) rows added
// total rows per segment ought to be preserved even when sinks are removed from memory:
addResult1 =
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bat", 1), null);
Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(3, addResult1.getNumRowsInSegment());
addResult0 =
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "baz", 1), null);
Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(2, addResult0.getNumRowsInSegment());
addResult1 =
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 1), null);
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(4, addResult1.getNumRowsInSegment());
// persist expected ^ (3) rows added
addResult0 =
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1), null);
Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(3, addResult0.getNumRowsInSegment());
appenderator.close();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
}
}
@Test
public void testRestoreFromDisk() throws Exception
{
final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, true);
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null);
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "baz", 3), null);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null);
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bob", 5), null);
Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.persistAll(null).get();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
List<File> segmentPaths = ((BatchAppenderator) appenderator).getPersistedidentifierPaths();
Assert.assertNotNull(segmentPaths);
Assert.assertEquals(3, segmentPaths.size());
appenderator.push(IDENTIFIERS, null, false).get();
segmentPaths = ((BatchAppenderator) appenderator).getPersistedidentifierPaths();
Assert.assertNotNull(segmentPaths);
Assert.assertEquals(0, segmentPaths.size());
appenderator.close();
}
@Test
public void testCleanupFromDiskAfterClose() throws Exception
{
final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, true);
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null);
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "baz", 3), null);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null);
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(4, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bob", 5), null);
Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.persistAll(null).get();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(5, appenderator.getTotalRowCount());
List<File> segmentPaths = ((BatchAppenderator) appenderator).getPersistedidentifierPaths();
Assert.assertNotNull(segmentPaths);
Assert.assertEquals(3, segmentPaths.size());
appenderator.close();
segmentPaths = ((BatchAppenderator) appenderator).getPersistedidentifierPaths();
Assert.assertNotNull(segmentPaths);
Assert.assertEquals(0, segmentPaths.size());
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, appenderator.getTotalRowCount());
}
@Test(timeout = 60_000L)
public void testTotalRowCount() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.startJob();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
Assert.assertEquals(1, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.persistAll(null).get();
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.drop(IDENTIFIERS.get(0)).get();
Assert.assertEquals(1, appenderator.getTotalRowCount());
appenderator.drop(IDENTIFIERS.get(1)).get();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bar", 1), null);
Assert.assertEquals(1, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "baz", 1), null);
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "qux", 1), null);
Assert.assertEquals(3, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bob", 1), null);
Assert.assertEquals(4, appenderator.getTotalRowCount());
appenderator.persistAll(null).get();
Assert.assertEquals(4, appenderator.getTotalRowCount());
appenderator.drop(IDENTIFIERS.get(2)).get();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.close();
Assert.assertEquals(0, appenderator.getTotalRowCount());
}
}
@Test
public void testVerifyRowIngestionMetrics() throws Exception
{
final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
try (final BatchAppenderatorTester tester =
new BatchAppenderatorTester(5,
10000L,
null, false, rowIngestionMeters
)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000",
"foo", "invalid_met"
), null);
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
Assert.assertEquals(1, rowIngestionMeters.getProcessed());
Assert.assertEquals(1, rowIngestionMeters.getProcessedWithError());
Assert.assertEquals(0, rowIngestionMeters.getUnparseable());
Assert.assertEquals(0, rowIngestionMeters.getThrownAway());
}
}
@Test
public void testPushContract() throws Exception
{
final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
try (final BatchAppenderatorTester tester =
new BatchAppenderatorTester(1,
50000L,
null, false, rowIngestionMeters
)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 1), null);
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar2", 1), null);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar3", 1), null);
// push only a single segment
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
Collections.singletonList(IDENTIFIERS.get(0)),
null,
false
).get();
// only one segment must have been pushed:
Assert.assertEquals(
Collections.singletonList(IDENTIFIERS.get(0)),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
SegmentIdWithShardSpec::fromDataSegment
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(
tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
);
// the responsability for dropping is in the BatchAppenderatorDriver, drop manually:
appenderator.drop(IDENTIFIERS.get(0));
// and the segment that was not pushed should still be active
Assert.assertEquals(
Collections.singletonList(IDENTIFIERS.get(1)),
appenderator.getSegments()
);
}
}
private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
{
return new SegmentIdWithShardSpec(
BatchAppenderatorTester.DATASOURCE,
Intervals.of(interval),
version,
new LinearShardSpec(partitionNum)
);
}
static InputRow createInputRow(String ts, String dim, Object met)
{
return new MapBasedInputRow(
DateTimes.of(ts).getMillis(),
ImmutableList.of("dim"),
ImmutableMap.of(
"dim",
dim,
"met",
met
)
);
}
}

View File

@ -0,0 +1,503 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
public class BatchAppenderatorTester implements AutoCloseable
{
public static final String DATASOURCE = "foo";
private final DataSchema schema;
private final AppenderatorConfig tuningConfig;
private final FireDepartmentMetrics metrics;
private final ObjectMapper objectMapper;
private final Appenderator appenderator;
private final ServiceEmitter emitter;
private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
public BatchAppenderatorTester(
final int maxRowsInMemory
)
{
this(maxRowsInMemory, -1, null, false);
}
public BatchAppenderatorTester(
final int maxRowsInMemory,
final boolean enablePushFailure
)
{
this(maxRowsInMemory, -1, null, enablePushFailure);
}
public BatchAppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
final boolean enablePushFailure
)
{
this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure);
}
public BatchAppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
final File basePersistDirectory,
final boolean enablePushFailure
)
{
this(
maxRowsInMemory,
maxSizeInBytes,
basePersistDirectory,
enablePushFailure,
new SimpleRowIngestionMeters(),
false,
false
);
}
public BatchAppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
@Nullable final File basePersistDirectory,
final boolean enablePushFailure,
final RowIngestionMeters rowIngestionMeters
)
{
this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, rowIngestionMeters,
false, false
);
}
public BatchAppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
@Nullable final File basePersistDirectory,
final boolean enablePushFailure,
final RowIngestionMeters rowIngestionMeters,
final boolean skipBytesInMemoryOverheadCheck,
final boolean useLegacyBatchProcessing
)
{
objectMapper = new DefaultObjectMapper();
objectMapper.registerSubtypes(LinearShardSpec.class);
final Map<String, Object> parserMap = objectMapper.convertValue(
new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(null, null, null),
null,
null,
null
)
),
Map.class
);
schema = new DataSchema(
DATASOURCE,
null,
null,
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("met", "met")
},
new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
null,
parserMap,
objectMapper
);
tuningConfig = new TestIndexTuningConfig(
TuningConfig.DEFAULT_APPENDABLE_INDEX,
maxRowsInMemory,
maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
skipBytesInMemoryOverheadCheck,
new IndexSpec(),
0,
false,
0L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory
);
metrics = new FireDepartmentMetrics();
IndexIO indexIO = new IndexIO(
objectMapper,
() -> 0
);
IndexMerger indexMerger = new IndexMergerV9(
objectMapper,
indexIO,
OffHeapMemorySegmentWriteOutMediumFactory.instance()
);
emitter = new ServiceEmitter(
"test",
"test",
new NoopEmitter()
);
emitter.start();
EmittingLogger.registerEmitter(emitter);
DataSegmentPusher dataSegmentPusher = new DataSegmentPusher()
{
private boolean mustFail = true;
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{
throw new UnsupportedOperationException();
}
@Override
public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
{
if (enablePushFailure && mustFail) {
mustFail = false;
throw new IOException("Push failure test");
} else if (enablePushFailure) {
mustFail = true;
}
pushedSegments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
};
appenderator = Appenderators.createOffline(
schema.getDataSource(),
schema,
tuningConfig,
metrics,
dataSegmentPusher,
objectMapper,
indexIO,
indexMerger,
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
useLegacyBatchProcessing
);
}
private long getDefaultMaxBytesInMemory()
{
return (Runtime.getRuntime().totalMemory()) / 3;
}
public DataSchema getSchema()
{
return schema;
}
public AppenderatorConfig getTuningConfig()
{
return tuningConfig;
}
public FireDepartmentMetrics getMetrics()
{
return metrics;
}
public ObjectMapper getObjectMapper()
{
return objectMapper;
}
public Appenderator getAppenderator()
{
return appenderator;
}
public List<DataSegment> getPushedSegments()
{
return pushedSegments;
}
@Override
public void close() throws Exception
{
appenderator.close();
emitter.close();
FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
}
private static File createNewBasePersistDirectory()
{
return FileUtils.createTempDir("druid-batch-persist");
}
private static class TestIndexTuningConfig implements AppenderatorConfig
{
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final boolean skipBytesInMemoryOverheadCheck;
private final int maxColumnsToMerge;
private final PartitionsSpec partitionsSpec;
private final IndexSpec indexSpec;
private final File basePersistDirectory;
private final int maxPendingPersists;
private final boolean reportParseExceptions;
private final long pushTimeout;
private final IndexSpec indexSpecForIntermediatePersists;
@Nullable
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
public TestIndexTuningConfig(
AppendableIndexSpec appendableIndexSpec,
Integer maxRowsInMemory,
Long maxBytesInMemory,
Boolean skipBytesInMemoryOverheadCheck,
IndexSpec indexSpec,
Integer maxPendingPersists,
Boolean reportParseExceptions,
Long pushTimeout,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
Integer maxColumnsToMerge,
File basePersistDirectory
)
{
this.appendableIndexSpec = appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck;
this.indexSpec = indexSpec;
this.maxPendingPersists = maxPendingPersists;
this.reportParseExceptions = reportParseExceptions;
this.pushTimeout = pushTimeout;
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
this.maxColumnsToMerge = maxColumnsToMerge;
this.basePersistDirectory = basePersistDirectory;
this.partitionsSpec = null;
this.indexSpecForIntermediatePersists = this.indexSpec;
}
@Override
public TestIndexTuningConfig withBasePersistDirectory(File dir)
{
throw new UnsupportedOperationException();
}
@Override
public AppendableIndexSpec getAppendableIndexSpec()
{
return appendableIndexSpec;
}
@Override
public int getMaxRowsInMemory()
{
return maxRowsInMemory;
}
@Override
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
}
@Override
public boolean isSkipBytesInMemoryOverheadCheck()
{
return skipBytesInMemoryOverheadCheck;
}
@Nullable
@Override
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;
}
@Override
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@Override
public IndexSpec getIndexSpecForIntermediatePersists()
{
return indexSpecForIntermediatePersists;
}
@Override
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
@Override
public boolean isReportParseExceptions()
{
return reportParseExceptions;
}
@Nullable
@Override
public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
{
return segmentWriteOutMediumFactory;
}
@Override
public int getMaxColumnsToMerge()
{
return maxColumnsToMerge;
}
@Override
public File getBasePersistDirectory()
{
return basePersistDirectory;
}
@Override
public Period getIntermediatePersistPeriod()
{
return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestIndexTuningConfig that = (TestIndexTuningConfig) o;
return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
maxRowsInMemory == that.maxRowsInMemory &&
maxBytesInMemory == that.maxBytesInMemory &&
skipBytesInMemoryOverheadCheck == that.skipBytesInMemoryOverheadCheck &&
maxColumnsToMerge == that.maxColumnsToMerge &&
maxPendingPersists == that.maxPendingPersists &&
reportParseExceptions == that.reportParseExceptions &&
pushTimeout == that.pushTimeout &&
Objects.equals(partitionsSpec, that.partitionsSpec) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) &&
Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory);
}
@Override
public int hashCode()
{
return Objects.hash(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
skipBytesInMemoryOverheadCheck,
maxColumnsToMerge,
partitionsSpec,
indexSpec,
indexSpecForIntermediatePersists,
basePersistDirectory,
maxPendingPersists,
reportParseExceptions,
pushTimeout,
segmentWriteOutMediumFactory
);
}
@Override
public String toString()
{
return "IndexTuningConfig{" +
"maxRowsInMemory=" + maxRowsInMemory +
", maxBytesInMemory=" + maxBytesInMemory +
", skipBytesInMemoryOverheadCheck=" + skipBytesInMemoryOverheadCheck +
", maxColumnsToMerge=" + maxColumnsToMerge +
", partitionsSpec=" + partitionsSpec +
", indexSpec=" + indexSpec +
", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists +
", basePersistDirectory=" + basePersistDirectory +
", maxPendingPersists=" + maxPendingPersists +
", reportParseExceptions=" + reportParseExceptions +
", pushTimeout=" + pushTimeout +
", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory +
'}';
}
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.segment.realtime.appenderator; package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Injector; import com.google.inject.Injector;
@ -43,7 +42,6 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
@ -170,13 +168,13 @@ public class DefaultOfflineAppenderatorFactoryTest
"A", "A",
new LinearShardSpec(0) new LinearShardSpec(0)
); );
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(identifier, AppenderatorTest.ir("2000", "bar", 1), Suppliers.ofInstance(Committers.nil())); appenderator.add(identifier, StreamAppenderatorTest.ir("2000", "bar", 1), null);
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.add(identifier, AppenderatorTest.ir("2000", "baz", 1), Suppliers.ofInstance(Committers.nil())); appenderator.add(identifier, StreamAppenderatorTest.ir("2000", "baz", 1), null);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
appenderator.close(); appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
} }
finally { finally {
appenderator.close(); appenderator.close();

View File

@ -545,11 +545,5 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean isRealTime()
{
return true;
}
} }
} }

View File

@ -95,7 +95,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
); );
private SegmentAllocator allocator; private SegmentAllocator allocator;
private AppenderatorTester appenderatorTester; private StreamAppenderatorTester streamAppenderatorTester;
private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
private StreamAppenderatorDriver driver; private StreamAppenderatorDriver driver;
private DataSegmentKiller dataSegmentKiller; private DataSegmentKiller dataSegmentKiller;
@ -107,15 +107,15 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
@Before @Before
public void setUp() public void setUp()
{ {
appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); streamAppenderatorTester = new StreamAppenderatorTester(MAX_ROWS_IN_MEMORY);
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
dataSegmentKiller = createStrictMock(DataSegmentKiller.class); dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
driver = new StreamAppenderatorDriver( driver = new StreamAppenderatorDriver(
appenderatorTester.getAppenderator(), streamAppenderatorTester.getAppenderator(),
allocator, allocator,
segmentHandoffNotifierFactory, segmentHandoffNotifierFactory,
new TestUsedSegmentChecker(appenderatorTester), new TestUsedSegmentChecker(streamAppenderatorTester.getPushedSegments()),
dataSegmentKiller, dataSegmentKiller,
OBJECT_MAPPER, OBJECT_MAPPER,
new FireDepartmentMetrics() new FireDepartmentMetrics()

View File

@ -61,7 +61,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public class AppenderatorTest extends InitializedNullHandlingTest public class StreamAppenderatorTest extends InitializedNullHandlingTest
{ {
private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of( private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
si("2000/2001", "A", 0), si("2000/2001", "A", 0),
@ -72,7 +72,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
@Test @Test
public void testSimpleIngestion() throws Exception public void testSimpleIngestion() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(2, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
boolean thrown; boolean thrown;
@ -83,7 +83,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
Assert.assertEquals(null, appenderator.startJob()); Assert.assertEquals(null, appenderator.startJob());
// getDataSource // getDataSource
Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource()); Assert.assertEquals(StreamAppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add // add
commitMetadata.put("x", "1"); commitMetadata.put("x", "1");
@ -157,7 +157,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{ {
try ( try (
final AppenderatorTester tester = new AppenderatorTester( final StreamAppenderatorTester tester = new StreamAppenderatorTester(
100, 100,
1024, 1024,
null, null,
@ -193,15 +193,15 @@ public class AppenderatorTest extends InitializedNullHandlingTest
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0; int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
Assert.assertEquals( Assert.assertEquals(
182 + nullHandlingOverhead, 182 + nullHandlingOverhead,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
Assert.assertEquals( Assert.assertEquals(
182 + nullHandlingOverhead, 182 + nullHandlingOverhead,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
); );
appenderator.close(); appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
} }
} }
@ -209,7 +209,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{ {
try ( try (
final AppenderatorTester tester = new AppenderatorTester( final StreamAppenderatorTester tester = new StreamAppenderatorTester(
100, 100,
1024, 1024,
null, null,
@ -243,21 +243,21 @@ public class AppenderatorTest extends InitializedNullHandlingTest
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0; int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
Assert.assertEquals(182 + nullHandlingOverhead, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); Assert.assertEquals(182 + nullHandlingOverhead, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
Assert.assertEquals( Assert.assertEquals(
364 + 2 * nullHandlingOverhead, 364 + 2 * nullHandlingOverhead,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
appenderator.close(); appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
} }
} }
@Test @Test
public void testMaxBytesInMemory() throws Exception public void testMaxBytesInMemory() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(100, 15000, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 15000, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0); final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> { final Supplier<Committer> committerSupplier = () -> {
@ -285,38 +285,38 @@ public class AppenderatorTest extends InitializedNullHandlingTest
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0; int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
int currentInMemoryIndexSize = 182 + nullHandlingOverhead; int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
int sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK; int sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet // currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead, currentInMemoryIndexSize + sinkSizeOverhead,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
// We do multiple more adds to the same sink to cause persist. // We do multiple more adds to the same sink to cause persist.
for (int i = 0; i < 53; i++) { for (int i = 0; i < 53; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
} }
sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK; sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant size is 0 since we just persist all indexes to disk. // currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0; currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist. // We are now over maxSizeInBytes after the add. Hence, we do a persist.
// currHydrant in the sink has 0 bytesInMemory since we just did a persist // currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have // Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
// 1 dimension columns, 2 metric column, 1 time column. // 1 dimension columns, 2 metric column, 1 time column.
int mappedIndexSize = 1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + int mappedIndexSize = 1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER + StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER; StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize, currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
// Add a single row after persisted // Add a single row after persisted
@ -325,11 +325,11 @@ public class AppenderatorTest extends InitializedNullHandlingTest
currentInMemoryIndexSize = 182 + nullHandlingOverhead; currentInMemoryIndexSize = 182 + nullHandlingOverhead;
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize, currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
// We do multiple more adds to the same sink to cause persist. // We do multiple more adds to the same sink to cause persist.
@ -342,28 +342,28 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// currHydrant in the sink has 0 bytesInMemory since we just did a persist // currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have // Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
// 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious // 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
// persists. // persists.
mappedIndexSize = 2 * (1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + mappedIndexSize = 2 * (1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER + StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER); StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize, currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
appenderator.close(); appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
} }
} }
@Test(expected = RuntimeException.class) @Test(expected = RuntimeException.class)
public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(100, 5180, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 5180, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0); final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> { final Supplier<Committer> committerSupplier = () -> {
@ -394,7 +394,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{ {
try ( try (
final AppenderatorTester tester = new AppenderatorTester( final StreamAppenderatorTester tester = new StreamAppenderatorTester(
100, 100,
10, 10,
null, null,
@ -429,13 +429,13 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// Expected 0 since we persisted after the add // Expected 0 since we persisted after the add
Assert.assertEquals( Assert.assertEquals(
0, 0,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
// Expected 0 since we persisted after the add // Expected 0 since we persisted after the add
Assert.assertEquals( Assert.assertEquals(
0, 0,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
} }
} }
@ -443,7 +443,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
@Test @Test
public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(100, 10000, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 10000, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0); final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> { final Supplier<Committer> committerSupplier = () -> {
@ -471,24 +471,24 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// Still under maxSizeInBytes after the add. Hence, we do not persist yet // Still under maxSizeInBytes after the add. Hence, we do not persist yet
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0; int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
int currentInMemoryIndexSize = 182 + nullHandlingOverhead; int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
int sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK; int sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead, currentInMemoryIndexSize + sinkSizeOverhead,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
// Close with row still in memory (no persist) // Close with row still in memory (no persist)
appenderator.close(); appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
} }
} }
@Test @Test
public void testMaxBytesInMemoryInMultipleSinks() throws Exception public void testMaxBytesInMemoryInMultipleSinks() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(100, 31100, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 31100, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0); final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> { final Supplier<Committer> committerSupplier = () -> {
@ -518,19 +518,19 @@ public class AppenderatorTest extends InitializedNullHandlingTest
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0; int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
int currentInMemoryIndexSize = 182 + nullHandlingOverhead; int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
int sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK; int sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet // currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
); );
Assert.assertEquals( Assert.assertEquals(
(2 * currentInMemoryIndexSize) + sinkSizeOverhead, (2 * currentInMemoryIndexSize) + sinkSizeOverhead,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
// We do multiple more adds to the same sink to cause persist. // We do multiple more adds to the same sink to cause persist.
@ -538,27 +538,27 @@ public class AppenderatorTest extends InitializedNullHandlingTest
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1), committerSupplier); appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1), committerSupplier);
} }
sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK; sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant size is 0 since we just persist all indexes to disk. // currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0; currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist. // We are now over maxSizeInBytes after the add. Hence, we do a persist.
// currHydrant in the sink has 0 bytesInMemory since we just did a persist // currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
); );
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have // Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
// 1 dimension columns, 2 metric column, 1 time column. // 1 dimension columns, 2 metric column, 1 time column.
int mappedIndexSize = 2 * (1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + int mappedIndexSize = 2 * (1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER + StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER); StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize, currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
// Add a single row after persisted to sink 0 // Add a single row after persisted to sink 0
@ -567,29 +567,29 @@ public class AppenderatorTest extends InitializedNullHandlingTest
currentInMemoryIndexSize = 182 + nullHandlingOverhead; currentInMemoryIndexSize = 182 + nullHandlingOverhead;
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
Assert.assertEquals( Assert.assertEquals(
0, 0,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
); );
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize, currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
// Now add a single row to sink 1 // Now add a single row to sink 1
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bob", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(1), ir("2000", "bob", 1), committerSupplier);
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
); );
Assert.assertEquals( Assert.assertEquals(
(2 * currentInMemoryIndexSize) + sinkSizeOverhead + mappedIndexSize, (2 * currentInMemoryIndexSize) + sinkSizeOverhead + mappedIndexSize,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
// We do multiple more adds to the both sink to cause persist. // We do multiple more adds to the both sink to cause persist.
@ -603,32 +603,32 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// currHydrant in the sink has 0 bytesInMemory since we just did a persist // currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize, currentInMemoryIndexSize,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
); );
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have // Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
// 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious // 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
// persists. // persists.
mappedIndexSize = 2 * (2 * (1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + mappedIndexSize = 2 * (2 * (1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER + StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER)); StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER));
Assert.assertEquals( Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize, currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
appenderator.close(); appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
} }
} }
@Test @Test
public void testIgnoreMaxBytesInMemory() throws Exception public void testIgnoreMaxBytesInMemory() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(100, -1, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, -1, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0); final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> { final Supplier<Committer> committerSupplier = () -> {
@ -650,33 +650,33 @@ public class AppenderatorTest extends InitializedNullHandlingTest
}; };
}; };
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.startJob(); appenderator.startJob();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
//we still calculate the size even when ignoring it to make persist decision //we still calculate the size even when ignoring it to make persist decision
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0; int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
Assert.assertEquals( Assert.assertEquals(
182 + nullHandlingOverhead, 182 + nullHandlingOverhead,
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)) ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
); );
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
int sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK; int sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals( Assert.assertEquals(
(364 + 2 * nullHandlingOverhead) + sinkSizeOverhead, (364 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory() ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
); );
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.close(); appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
} }
} }
@Test @Test
public void testMaxRowsInMemory() throws Exception public void testMaxRowsInMemory() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(3, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0); final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = new Supplier<Committer>() final Supplier<Committer> committerSupplier = new Supplier<Committer>()
@ -703,23 +703,23 @@ public class AppenderatorTest extends InitializedNullHandlingTest
} }
}; };
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.startJob(); appenderator.startJob();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier);
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier);
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.persistAll(committerSupplier.get()); appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.close(); appenderator.close();
} }
} }
@ -727,7 +727,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
@Test @Test
public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(3, false)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, false)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0); final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> { final Supplier<Committer> committerSupplier = () -> {
@ -749,23 +749,23 @@ public class AppenderatorTest extends InitializedNullHandlingTest
}; };
}; };
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.startJob(); appenderator.startJob();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier, false); appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier, false);
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false); appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false); appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier, false); appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier, false);
Assert.assertEquals(3, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(3, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier, false); appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier, false);
Assert.assertEquals(4, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(4, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier, false); appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier, false);
Assert.assertEquals(5, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(5, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.persistAll(committerSupplier.get()); appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.close(); appenderator.close();
} }
} }
@ -774,7 +774,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
public void testRestoreFromDisk() throws Exception public void testRestoreFromDisk() throws Exception
{ {
final RealtimeTuningConfig tuningConfig; final RealtimeTuningConfig tuningConfig;
try (final AppenderatorTester tester = new AppenderatorTester(2, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
tuningConfig = tester.getTuningConfig(); tuningConfig = tester.getTuningConfig();
@ -816,7 +816,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 5), committerSupplier); appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 5), committerSupplier);
appenderator.close(); appenderator.close();
try (final AppenderatorTester tester2 = new AppenderatorTester( try (final StreamAppenderatorTester tester2 = new StreamAppenderatorTester(
2, 2,
-1, -1,
tuningConfig.getBasePersistDirectory(), tuningConfig.getBasePersistDirectory(),
@ -833,7 +833,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testTotalRowCount() throws Exception public void testTotalRowCount() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(3, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>(); final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>();
final Supplier<Committer> committerSupplier = committerSupplierFromConcurrentMap(commitMetadata); final Supplier<Committer> committerSupplier = committerSupplierFromConcurrentMap(commitMetadata);
@ -876,7 +876,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
public void testVerifyRowIngestionMetrics() throws Exception public void testVerifyRowIngestionMetrics() throws Exception
{ {
final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters(); final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
try (final AppenderatorTester tester = new AppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob(); appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), Committers.nilSupplier()); appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), Committers.nilSupplier());
@ -892,7 +892,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
@Test @Test
public void testQueryByIntervals() throws Exception public void testQueryByIntervals() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(2, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob(); appenderator.startJob();
@ -906,7 +906,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// Query1: 2000/2001 // Query1: 2000/2001
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(AppenderatorTester.DATASOURCE) .dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2001"))) .intervals(ImmutableList.of(Intervals.of("2000/2001")))
.aggregators( .aggregators(
Arrays.asList( Arrays.asList(
@ -932,7 +932,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// Query2: 2000/2002 // Query2: 2000/2002
final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(AppenderatorTester.DATASOURCE) .dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2002"))) .intervals(ImmutableList.of(Intervals.of("2000/2002")))
.aggregators( .aggregators(
Arrays.asList( Arrays.asList(
@ -962,7 +962,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// Query3: 2000/2001T01 // Query3: 2000/2001T01
final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder() final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
.dataSource(AppenderatorTester.DATASOURCE) .dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2001T01"))) .intervals(ImmutableList.of(Intervals.of("2000/2001T01")))
.aggregators( .aggregators(
Arrays.asList( Arrays.asList(
@ -991,7 +991,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// Query4: 2000/2001T01, 2001T03/2001T04 // Query4: 2000/2001T01, 2001T03/2001T04
final TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder() final TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder()
.dataSource(AppenderatorTester.DATASOURCE) .dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals( .intervals(
ImmutableList.of( ImmutableList.of(
Intervals.of("2000/2001T01"), Intervals.of("2000/2001T01"),
@ -1028,7 +1028,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
@Test @Test
public void testQueryBySegments() throws Exception public void testQueryBySegments() throws Exception
{ {
try (final AppenderatorTester tester = new AppenderatorTester(2, true)) { try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator(); final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob(); appenderator.startJob();
@ -1042,7 +1042,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// Query1: segment #2 // Query1: segment #2
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(AppenderatorTester.DATASOURCE) .dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators( .aggregators(
Arrays.asList( Arrays.asList(
new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("count", "count"),
@ -1078,7 +1078,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// Query2: segment #2, partial // Query2: segment #2, partial
final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(AppenderatorTester.DATASOURCE) .dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators( .aggregators(
Arrays.asList( Arrays.asList(
new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("count", "count"),
@ -1114,7 +1114,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
// Query3: segment #2, two disjoint intervals // Query3: segment #2, two disjoint intervals
final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder() final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
.dataSource(AppenderatorTester.DATASOURCE) .dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators( .aggregators(
Arrays.asList( Arrays.asList(
new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("count", "count"),
@ -1154,7 +1154,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
); );
final ScanQuery query4 = Druids.newScanQueryBuilder() final ScanQuery query4 = Druids.newScanQueryBuilder()
.dataSource(AppenderatorTester.DATASOURCE) .dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals( .intervals(
new MultipleSpecificSegmentSpec( new MultipleSpecificSegmentSpec(
ImmutableList.of( ImmutableList.of(
@ -1194,7 +1194,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum) private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum)
{ {
return new SegmentIdWithShardSpec( return new SegmentIdWithShardSpec(
AppenderatorTester.DATASOURCE, StreamAppenderatorTester.DATASOURCE,
Intervals.of(interval), Intervals.of(interval),
version, version,
new LinearShardSpec(partitionNum) new LinearShardSpec(partitionNum)

View File

@ -77,7 +77,7 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
public class AppenderatorTester implements AutoCloseable public class StreamAppenderatorTester implements AutoCloseable
{ {
public static final String DATASOURCE = "foo"; public static final String DATASOURCE = "foo";
@ -94,14 +94,14 @@ public class AppenderatorTester implements AutoCloseable
private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>(); private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
public AppenderatorTester( public StreamAppenderatorTester(
final int maxRowsInMemory final int maxRowsInMemory
) )
{ {
this(maxRowsInMemory, -1, null, false); this(maxRowsInMemory, -1, null, false);
} }
public AppenderatorTester( public StreamAppenderatorTester(
final int maxRowsInMemory, final int maxRowsInMemory,
final boolean enablePushFailure final boolean enablePushFailure
) )
@ -109,7 +109,7 @@ public class AppenderatorTester implements AutoCloseable
this(maxRowsInMemory, -1, null, enablePushFailure); this(maxRowsInMemory, -1, null, enablePushFailure);
} }
public AppenderatorTester( public StreamAppenderatorTester(
final int maxRowsInMemory, final int maxRowsInMemory,
final long maxSizeInBytes, final long maxSizeInBytes,
final boolean enablePushFailure final boolean enablePushFailure
@ -118,7 +118,7 @@ public class AppenderatorTester implements AutoCloseable
this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure); this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure);
} }
public AppenderatorTester( public StreamAppenderatorTester(
final int maxRowsInMemory, final int maxRowsInMemory,
final long maxSizeInBytes, final long maxSizeInBytes,
final File basePersistDirectory, final File basePersistDirectory,
@ -128,7 +128,7 @@ public class AppenderatorTester implements AutoCloseable
this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters(), false); this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters(), false);
} }
public AppenderatorTester( public StreamAppenderatorTester(
final int maxRowsInMemory, final int maxRowsInMemory,
final long maxSizeInBytes, final long maxSizeInBytes,
final File basePersistDirectory, final File basePersistDirectory,
@ -139,7 +139,7 @@ public class AppenderatorTester implements AutoCloseable
this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, rowIngestionMeters, false); this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, rowIngestionMeters, false);
} }
public AppenderatorTester( public StreamAppenderatorTester(
final int maxRowsInMemory, final int maxRowsInMemory,
final long maxSizeInBytes, final long maxSizeInBytes,
final File basePersistDirectory, final File basePersistDirectory,

View File

@ -26,22 +26,23 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
public class TestUsedSegmentChecker implements UsedSegmentChecker public class TestUsedSegmentChecker implements UsedSegmentChecker
{ {
private final AppenderatorTester appenderatorTester; private final List<DataSegment> pushedSegments;
public TestUsedSegmentChecker(AppenderatorTester appenderatorTester) public TestUsedSegmentChecker(List<DataSegment> pushedSegments)
{ {
this.appenderatorTester = appenderatorTester; this.pushedSegments = pushedSegments;
} }
@Override @Override
public Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> identifiers) public Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> identifiers)
{ {
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural()); final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
VersionedIntervalTimeline.addSegments(timeline, appenderatorTester.getPushedSegments().iterator()); VersionedIntervalTimeline.addSegments(timeline, pushedSegments.iterator());
final Set<DataSegment> retVal = new HashSet<>(); final Set<DataSegment> retVal = new HashSet<>();
for (SegmentIdWithShardSpec identifier : identifiers) { for (SegmentIdWithShardSpec identifier : identifiers) {