diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java index bff4f2e6de3..f29219de954 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java @@ -31,6 +31,8 @@ public class NoopRowIngestionMeters implements RowIngestionMeters { private static final RowIngestionMetersTotals EMPTY_TOTALS = new RowIngestionMetersTotals(0, 0, 0, 0, 0); + public static final NoopRowIngestionMeters INSTANCE = new NoopRowIngestionMeters(); + @Override public long getProcessed() { diff --git a/quidem-ut/src/main/java/org/apache/druid/quidem/ScaledResoureInputDataSource.java b/quidem-ut/src/main/java/org/apache/druid/quidem/ScaledResoureInputDataSource.java new file mode 100644 index 00000000000..c8605fbe050 --- /dev/null +++ b/quidem-ut/src/main/java/org/apache/druid/quidem/ScaledResoureInputDataSource.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.quidem; + +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; +import org.apache.druid.data.input.ResourceInputSource; +import org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.segment.incremental.NoopRowIngestionMeters; +import org.apache.druid.segment.incremental.ParseExceptionHandler; +import java.io.File; +import java.io.IOException; +import java.util.function.Predicate; + +public class ScaledResoureInputDataSource extends AbstractInputSource +{ + private int k; + private int n; + private ResourceInputSource resourceInputSource; + + public ScaledResoureInputDataSource(int k, int n, ResourceInputSource resourceInputSource) + { + this.k = k; + this.n = n; + this.resourceInputSource = resourceInputSource; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public boolean needsFormat() + { + return true; + } + + @Override + public InputSourceReader reader(InputRowSchema inputRowSchema, InputFormat inputFormat, File temporaryDirectory) + { + InputSourceReader reader = resourceInputSource.reader(inputRowSchema, inputFormat, temporaryDirectory); + return new FilteredReader(reader, this::filterPredicate); + } + + public boolean filterPredicate(InputRow inputRow) + { + return (Math.abs(inputRow.hashCode()) % n) < k; + } + + static class FilteredReader implements InputSourceReader + { + private InputSourceReader reader; + private Predicate filterPredicate; + + public FilteredReader(InputSourceReader reader, Predicate filterPredicate) + { + this.reader = reader; + this.filterPredicate = filterPredicate; + } + + @Override + public CloseableIterator read(InputStats inputStats) throws IOException + { + return new FilteringCloseableInputRowIterator( + reader.read(inputStats), + filterPredicate, + NoopRowIngestionMeters.INSTANCE, + new ParseExceptionHandler(NoopRowIngestionMeters.INSTANCE, false, 0, 0) + ); + } + + @Override + public CloseableIterator sample() throws IOException + { + return reader.sample(); + } + } +}