Refactor: Simplify creation input row filter predicate in various batch tasks (#16196)

Changes:
- Simplify method `AbstractBatchIndexTask.defaultRowFilter()` and rename
- Add method `allowNonNullWithinInputIntervalsOf()`
- Add javadocs
This commit is contained in:
Kashif Faraz 2024-03-26 04:54:07 +05:30 committed by GitHub
parent a16092b16a
commit e7dc00b86d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 33 additions and 42 deletions

View File

@ -225,15 +225,35 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
);
}
protected static Predicate<InputRow> defaultRowFilter(GranularitySpec granularitySpec)
/**
* Creates a predicate that is true for input rows which (a) are non-null and
* (b) can be bucketed into an interval using the given granularity spec.
* This predicate filters out all rows if the granularity spec has no
* input intervals.
*/
protected static Predicate<InputRow> allowNonNullRowsStrictlyWithinInputIntervalsOf(
GranularitySpec granularitySpec
)
{
return inputRow -> {
if (inputRow == null) {
return false;
}
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
return optInterval.isPresent();
};
return inputRow ->
inputRow != null
&& granularitySpec.bucketInterval(inputRow.getTimestamp()).isPresent();
}
/**
* Creates a predicate that is true for input rows which (a) are non-null and
* (b) can be bucketed into an interval using the given granularity spec.
* This predicate allows all non-null rows if the granularity spec has
* no input intervals.
*/
protected static Predicate<InputRow> allowNonNullRowsWithinInputIntervalsOf(
GranularitySpec granularitySpec
)
{
return inputRow ->
inputRow != null
&& (granularitySpec.inputIntervals().isEmpty()
|| granularitySpec.bucketInterval(inputRow.getTimestamp()).isPresent());
}
/**

View File

@ -136,7 +136,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@ -800,23 +799,12 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
Comparators.intervalsByStartThenEnd()
);
final Granularity queryGranularity = granularitySpec.getQueryGranularity();
final Predicate<InputRow> rowFilter = inputRow -> {
if (inputRow == null) {
return false;
}
if (determineIntervals) {
return true;
}
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
return optInterval.isPresent();
};
try (final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
tmpDir,
ingestionSchema.getDataSchema(),
inputSource,
inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
rowFilter,
allowNonNullRowsWithinInputIntervalsOf(granularitySpec),
determinePartitionsMeters,
determinePartitionsParseExceptionHandler
)) {

View File

@ -84,7 +84,7 @@ public class InputSourceProcessor
dataSchema,
inputSource,
inputFormat,
AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
AbstractBatchIndexTask.allowNonNullRowsStrictlyWithinInputIntervalsOf(granularitySpec),
buildSegmentsMeters,
parseExceptionHandler
);

View File

@ -59,7 +59,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@ -185,15 +184,13 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty();
try (
final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
toolbox.getIndexingTmpDir(),
dataSchema,
inputSource,
inputFormat,
determineIntervals ? Objects::nonNull : AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
allowNonNullRowsWithinInputIntervalsOf(granularitySpec),
buildSegmentsMeters,
parseExceptionHandler
);

View File

@ -62,7 +62,6 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -232,15 +231,13 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty();
try (
final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
toolbox.getIndexingTmpDir(),
dataSchema,
inputSource,
inputFormat,
determineIntervals ? Objects::nonNull : AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
allowNonNullRowsWithinInputIntervalsOf(granularitySpec),
buildSegmentsMeters,
parseExceptionHandler
);

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
@ -383,7 +382,6 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec();
final long pushTimeout = tuningConfig.getPushTimeout();
final boolean explicitIntervals = !granularitySpec.inputIntervals().isEmpty();
final boolean useLineageBasedSegmentAllocation = getContextValue(
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
SinglePhaseParallelIndexTaskRunner.LEGACY_DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
@ -427,16 +425,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
dataSchema,
inputSource,
inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
inputRow -> {
if (inputRow == null) {
return false;
}
if (explicitIntervals) {
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
return optInterval.isPresent();
}
return true;
},
allowNonNullRowsWithinInputIntervalsOf(granularitySpec),
rowIngestionMeters,
parseExceptionHandler
)