Revert "remove maxRows"

This reverts commit 4d4de51f4f.
This commit is contained in:
Zoltan Haindrich 2024-09-25 09:31:25 +00:00
parent 4d4de51f4f
commit e366192fb5
2 changed files with 50 additions and 9 deletions

View File

@ -59,7 +59,7 @@ public class KttmNestedComponentSupplier extends StandardComponentSupplier
public static class Micro extends KttmNestedComponentSupplier {
public Micro(TempDirProducer tempDirProducer)
{
super(tempDirProducer, Fraction.getFraction(1, 10000));
super(tempDirProducer, Fraction.getFraction(1, 10000), Integer.MAX_VALUE);
}
}
@ -68,13 +68,14 @@ public class KttmNestedComponentSupplier extends StandardComponentSupplier
public KttmNestedComponentSupplier(TempDirProducer tempDirProducer)
{
this(tempDirProducer, Fraction.ONE);
this(tempDirProducer, Fraction.ONE, Integer.MAX_VALUE);
}
public KttmNestedComponentSupplier(TempDirProducer tempDirProducer, Fraction fraction)
public KttmNestedComponentSupplier(TempDirProducer tempDirProducer, Fraction fraction, int maxRows)
{
super(tempDirProducer);
this.fraction = fraction;
this.maxRows = maxRows;
}
@Override

View File

@ -41,10 +41,12 @@ public class ScaledResoureInputDataSource extends AbstractInputSource
{
private final ResourceInputSource resourceInputSource;
private final Fraction fraction;
private final int maxRows;
public ScaledResoureInputDataSource(Fraction fraction, ResourceInputSource resourceInputSource)
public ScaledResoureInputDataSource(Fraction fraction, int maxRows, ResourceInputSource resourceInputSource)
{
this.fraction = fraction;
this.maxRows = maxRows;
this.resourceInputSource = resourceInputSource;
}
@ -64,7 +66,7 @@ public class ScaledResoureInputDataSource extends AbstractInputSource
public InputSourceReader reader(InputRowSchema inputRowSchema, InputFormat inputFormat, File temporaryDirectory)
{
InputSourceReader reader = resourceInputSource.reader(inputRowSchema, inputFormat, temporaryDirectory);
return new FilteredReader(reader, this::filterPredicate);
return new FilteredReader(reader, this::filterPredicate, maxRows);
}
public boolean filterPredicate(InputRow inputRow)
@ -74,24 +76,27 @@ public class ScaledResoureInputDataSource extends AbstractInputSource
static class FilteredReader implements InputSourceReader
{
private final InputSourceReader reader;
private final Predicate<InputRow> filterPredicate;
private InputSourceReader reader;
private Predicate<InputRow> filterPredicate;
private int maxRows;
public FilteredReader(InputSourceReader reader, Predicate<InputRow> filterPredicate)
public FilteredReader(InputSourceReader reader, Predicate<InputRow> filterPredicate, int maxRows)
{
this.reader = reader;
this.filterPredicate = filterPredicate;
this.maxRows = maxRows;
}
@Override
public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException
{
return new FilteringCloseableInputRowIterator(
FilteringCloseableInputRowIterator filteredIterator = new FilteringCloseableInputRowIterator(
reader.read(inputStats),
filterPredicate,
NoopRowIngestionMeters.INSTANCE,
new ParseExceptionHandler(NoopRowIngestionMeters.INSTANCE, false, 0, 0)
);
return new LimitedCloseableIterator<>(filteredIterator, maxRows);
}
@Override
@ -100,4 +105,39 @@ public class ScaledResoureInputDataSource extends AbstractInputSource
return reader.sample();
}
}
static class LimitedCloseableIterator<InputRow> implements CloseableIterator<InputRow>
{
private final CloseableIterator<InputRow> delegate;
private final int maxRows;
private int count = 0;
public LimitedCloseableIterator(CloseableIterator<InputRow> delegate, int maxRows)
{
this.delegate = delegate;
this.maxRows = maxRows;
}
@Override
public boolean hasNext()
{
return count < maxRows && delegate.hasNext();
}
@Override
public InputRow next()
{
if (count >= maxRows) {
throw new IllegalStateException("Exceeded maxRows");
}
count++;
return delegate.next();
}
@Override
public void close() throws IOException
{
delegate.close();
}
}
}