use fraction+maxRows

This commit is contained in:
Zoltan Haindrich 2024-09-25 09:29:51 +00:00
parent 0e61dfaeb2
commit bdf67180cb
3 changed files with 63 additions and 12 deletions

View File

@ -20,7 +20,7 @@
package org.apache.druid.quidem; package org.apache.druid.quidem;
import com.google.inject.Injector; import com.google.inject.Injector;
import org.apache.commons.lang.math.Fraction; import org.apache.commons.lang3.math.Fraction;
import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.ResourceInputSource; import org.apache.druid.data.input.ResourceInputSource;
import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema;
@ -56,17 +56,26 @@ import java.util.UUID;
public class KttmNestedComponentSupplier extends StandardComponentSupplier public class KttmNestedComponentSupplier extends StandardComponentSupplier
{ {
private Fraction fraction; public static class Micro extends KttmNestedComponentSupplier {
public Micro(TempDirProducer tempDirProducer)
{
super(tempDirProducer, Fraction.getFraction(1, 10000), Integer.MAX_VALUE);
}
}
private final Fraction fraction;
private final int maxRows;
public KttmNestedComponentSupplier(TempDirProducer tempDirProducer) public KttmNestedComponentSupplier(TempDirProducer tempDirProducer)
{ {
this(tempDirProducer, Fraction.ONE); this(tempDirProducer, Fraction.ONE, Integer.MAX_VALUE);
} }
public KttmNestedComponentSupplier(TempDirProducer tempDirProducer, Fraction fraction) public KttmNestedComponentSupplier(TempDirProducer tempDirProducer, Fraction fraction, int maxRows)
{ {
super(tempDirProducer); super(tempDirProducer);
this.fraction = fraction; this.fraction = fraction;
this.maxRows = maxRows;
} }
@Override @Override
@ -132,6 +141,7 @@ public class KttmNestedComponentSupplier extends StandardComponentSupplier
try { try {
InputSource inputSource = new ScaledResoureInputDataSource( InputSource inputSource = new ScaledResoureInputDataSource(
fraction, fraction,
maxRows,
ResourceInputSource.of( ResourceInputSource.of(
TestIndex.class.getClassLoader(), TestIndex.class.getClassLoader(),
"kttm-nested-v2-2019-08-25.json" "kttm-nested-v2-2019-08-25.json"

View File

@ -19,7 +19,7 @@
package org.apache.druid.quidem; package org.apache.druid.quidem;
import org.apache.commons.lang.math.Fraction; import org.apache.commons.lang3.math.Fraction;
import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
@ -32,18 +32,21 @@ import org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.incremental.NoopRowIngestionMeters; import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionHandler;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.function.Predicate; import java.util.function.Predicate;
public class ScaledResoureInputDataSource extends AbstractInputSource public class ScaledResoureInputDataSource extends AbstractInputSource
{ {
private ResourceInputSource resourceInputSource; private final ResourceInputSource resourceInputSource;
private Fraction fraction; private final Fraction fraction;
private final int maxRows;
public ScaledResoureInputDataSource(Fraction fraction, ResourceInputSource resourceInputSource) public ScaledResoureInputDataSource(Fraction fraction, int maxRows, ResourceInputSource resourceInputSource)
{ {
this.fraction = fraction; this.fraction = fraction;
this.maxRows = maxRows;
this.resourceInputSource = resourceInputSource; this.resourceInputSource = resourceInputSource;
} }
@ -63,7 +66,7 @@ public class ScaledResoureInputDataSource extends AbstractInputSource
public InputSourceReader reader(InputRowSchema inputRowSchema, InputFormat inputFormat, File temporaryDirectory) public InputSourceReader reader(InputRowSchema inputRowSchema, InputFormat inputFormat, File temporaryDirectory)
{ {
InputSourceReader reader = resourceInputSource.reader(inputRowSchema, inputFormat, temporaryDirectory); InputSourceReader reader = resourceInputSource.reader(inputRowSchema, inputFormat, temporaryDirectory);
return new FilteredReader(reader, this::filterPredicate); return new FilteredReader(reader, this::filterPredicate, maxRows);
} }
public boolean filterPredicate(InputRow inputRow) public boolean filterPredicate(InputRow inputRow)
@ -75,22 +78,25 @@ public class ScaledResoureInputDataSource extends AbstractInputSource
{ {
private InputSourceReader reader; private InputSourceReader reader;
private Predicate<InputRow> filterPredicate; private Predicate<InputRow> filterPredicate;
private int maxRows;
public FilteredReader(InputSourceReader reader, Predicate<InputRow> filterPredicate) public FilteredReader(InputSourceReader reader, Predicate<InputRow> filterPredicate, int maxRows)
{ {
this.reader = reader; this.reader = reader;
this.filterPredicate = filterPredicate; this.filterPredicate = filterPredicate;
this.maxRows = maxRows;
} }
@Override @Override
public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException
{ {
return new FilteringCloseableInputRowIterator( FilteringCloseableInputRowIterator filteredIterator = new FilteringCloseableInputRowIterator(
reader.read(inputStats), reader.read(inputStats),
filterPredicate, filterPredicate,
NoopRowIngestionMeters.INSTANCE, NoopRowIngestionMeters.INSTANCE,
new ParseExceptionHandler(NoopRowIngestionMeters.INSTANCE, false, 0, 0) new ParseExceptionHandler(NoopRowIngestionMeters.INSTANCE, false, 0, 0)
); );
return new LimitedCloseableIterator<>(filteredIterator, maxRows);
} }
@Override @Override
@ -99,4 +105,39 @@ public class ScaledResoureInputDataSource extends AbstractInputSource
return reader.sample(); return reader.sample();
} }
} }
static class LimitedCloseableIterator<InputRow> implements CloseableIterator<InputRow>
{
private final CloseableIterator<InputRow> delegate;
private final int maxRows;
private int count = 0;
public LimitedCloseableIterator(CloseableIterator<InputRow> delegate, int maxRows)
{
this.delegate = delegate;
this.maxRows = maxRows;
}
@Override
public boolean hasNext()
{
return count < maxRows && delegate.hasNext();
}
@Override
public InputRow next()
{
if (count >= maxRows) {
throw new IllegalStateException("Exceeded maxRows");
}
count++;
return delegate.next();
}
@Override
public void close() throws IOException
{
delegate.close();
}
}
} }

View File

@ -24,7 +24,7 @@ import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig.ComponentSupplier; import org.apache.druid.sql.calcite.SqlTestFrameworkConfig.ComponentSupplier;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ComponentSupplier(KttmNestedComponentSupplier.class) @ComponentSupplier(KttmNestedComponentSupplier.Micro.class)
public class KttmNestedComponentSupplierTest extends BaseCalciteQueryTest public class KttmNestedComponentSupplierTest extends BaseCalciteQueryTest
{ {
@Test @Test