add javadoc and test (#10938)

This commit is contained in:
Maytas Monsereenusorn 2021-03-02 19:34:00 -08:00 committed by GitHub
parent 7d9a61cf7f
commit 23333914c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 5 deletions

View File

@ -110,15 +110,29 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4); checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE); String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate); queryHelper.testQueriesFromString(queryResponseTemplate);
// QueryGranularity was SECOND, now we will change it to HOUR // QueryGranularity was SECOND, now we will change it to HOUR (QueryGranularity changed to coarser)
compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.HOUR); compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.HOUR);
// The original 4 segments should be compacted into 2 new segments // The original 4 segments should be compacted into 2 new segments since data only has 2 days and the compaction
// segmentGranularity is DAY
checkNumberOfSegments(2); checkNumberOfSegments(2);
queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE); queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate); queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.HOUR.name(), 2); checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.HOUR.name(), 2);
checkCompactionIntervals(expectedIntervalAfterCompaction); checkCompactionIntervals(expectedIntervalAfterCompaction);
// QueryGranularity was HOUR, now we will change it to MINUTE (QueryGranularity changed to finer)
compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.MINUTE);
// There will be no change in number of segments as compaction segmentGranularity is the same and data interval
// is the same. Since QueryGranularity is changed to finer qranularity, the data will remains the same. (data
// will just be bucketed to a finer qranularity but roll up will not be different
// i.e. 2020-10-29T05:00 will just be bucketed to 2020-10-29T05:00:00)
checkNumberOfSegments(2);
queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.MINUTE.name(), 2);
checkCompactionIntervals(expectedIntervalAfterCompaction);
} }
} }

View File

@ -22,9 +22,20 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import java.util.Objects; import java.util.Objects;
/**
* Spec containing Granularity configs for Compaction Task.
* This class mimics JSON field names for fields supported in compaction task with
* the corresponding fields in {@link GranularitySpec}.
* This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
* Granularity configs for Compaction task as they would for any other ingestion task.
* Note that this class is not the same as {@link GranularitySpec}. This class simply holds Granularity configs
* and use it to generate index task specs (Compaction task internally creates index task).
* This class does not do bucketing, group events or knows how to partition data.
*/
public class ClientCompactionTaskGranularitySpec public class ClientCompactionTaskGranularitySpec
{ {
private final Granularity segmentGranularity; private final Granularity segmentGranularity;

View File

@ -333,11 +333,11 @@ public class AppenderatorImpl implements Appenderator
// persistAll clears rowsCurrentlyInMemory, no need to update it. // persistAll clears rowsCurrentlyInMemory, no need to update it.
log.info("Flushing in-memory data to disk because %s.", String.join(",", persistReasons)); log.info("Flushing in-memory data to disk because %s.", String.join(",", persistReasons));
long bytesPersisted = 0L; long bytesToBePersisted = 0L;
for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) { for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
final Sink sinkEntry = entry.getValue(); final Sink sinkEntry = entry.getValue();
if (sinkEntry != null) { if (sinkEntry != null) {
bytesPersisted += sinkEntry.getBytesInMemory(); bytesToBePersisted += sinkEntry.getBytesInMemory();
if (sinkEntry.swappable()) { if (sinkEntry.swappable()) {
// After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory. // After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory.
// These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory // These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory
@ -347,7 +347,7 @@ public class AppenderatorImpl implements Appenderator
} }
} }
if (!skipBytesInMemoryOverheadCheck && bytesCurrentlyInMemory.get() - bytesPersisted > maxBytesTuningConfig) { if (!skipBytesInMemoryOverheadCheck && bytesCurrentlyInMemory.get() - bytesToBePersisted > maxBytesTuningConfig) {
// We are still over maxBytesTuningConfig even after persisting. // We are still over maxBytesTuningConfig even after persisting.
// This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion) // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
final String alertMessage = StringUtils.format( final String alertMessage = StringUtils.format(

View File

@ -22,9 +22,19 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import java.util.Objects; import java.util.Objects;
/**
* Spec containing Granularity configs for Auto Compaction.
* This class mimics JSON field names for fields supported in auto compaction with
* the corresponding fields in {@link GranularitySpec}.
* This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
* Granularity configs for Auto Compaction as they would for any other ingestion task.
* Note that this class is not the same as {@link GranularitySpec}. This class simply holds Granularity configs
* and pass it to compaction task spec. This class does not do bucketing, group events or knows how to partition data.
*/
public class UserCompactionTaskGranularityConfig public class UserCompactionTaskGranularityConfig
{ {
private final Granularity segmentGranularity; private final Granularity segmentGranularity;