Make RecordSupplierInputSource respect sampler timeout when stream is empty (#13296)

* Make RecordSupplierInputSource respect sampler timeout when stream is empty

* Rename timeout param, make it nullable, add timeout test
This commit is contained in:
Jonathan Wei 2022-11-03 17:45:35 -05:00 committed by GitHub
parent 2a757b64e8
commit 2fdaa2fcab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 5 deletions

View File

@ -31,6 +31,7 @@ import org.apache.druid.indexing.overlord.sampler.SamplerException;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -45,19 +46,28 @@ import java.util.stream.Collectors;
*/ */
public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity> extends AbstractInputSource public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity> extends AbstractInputSource
{ {
private static final Logger LOG = new Logger(RecordSupplierInputSource.class);
private final String topic; private final String topic;
private final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier; private final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier;
private final boolean useEarliestOffset; private final boolean useEarliestOffset;
/**
* Maximum amount of time in which the entity iterator will return results. If null, no timeout is applied.
*/
private final Integer iteratorTimeoutMs;
public RecordSupplierInputSource( public RecordSupplierInputSource(
String topic, String topic,
RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier, RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier,
boolean useEarliestOffset boolean useEarliestOffset,
Integer iteratorTimeoutMs
) )
{ {
this.topic = topic; this.topic = topic;
this.recordSupplier = recordSupplier; this.recordSupplier = recordSupplier;
this.useEarliestOffset = useEarliestOffset; this.useEarliestOffset = useEarliestOffset;
this.iteratorTimeoutMs = iteratorTimeoutMs;
try { try {
assignAndSeek(recordSupplier); assignAndSeek(recordSupplier);
} }
@ -123,13 +133,24 @@ public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, Reco
private Iterator<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType>> recordIterator; private Iterator<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType>> recordIterator;
private Iterator<? extends ByteEntity> bytesIterator; private Iterator<? extends ByteEntity> bytesIterator;
private volatile boolean closed; private volatile boolean closed;
private final long createTime = System.currentTimeMillis();
private final Long terminationTime = iteratorTimeoutMs != null ? createTime + iteratorTimeoutMs : null;
private void waitNextIteratorIfNecessary() private void waitNextIteratorIfNecessary()
{ {
while (!closed && (bytesIterator == null || !bytesIterator.hasNext())) { while (!closed && (bytesIterator == null || !bytesIterator.hasNext())) {
while (!closed && (recordIterator == null || !recordIterator.hasNext())) { while (!closed && (recordIterator == null || !recordIterator.hasNext())) {
if (terminationTime != null && System.currentTimeMillis() > terminationTime) {
LOG.info(
"Configured sampler timeout [%s] has been exceeded, returning without a bytesIterator.",
iteratorTimeoutMs
);
bytesIterator = null;
return;
}
recordIterator = recordSupplier.poll(SeekableStreamSamplerSpec.POLL_TIMEOUT_MS).iterator(); recordIterator = recordSupplier.poll(SeekableStreamSamplerSpec.POLL_TIMEOUT_MS).iterator();
} }
if (!closed) { if (!closed) {
bytesIterator = recordIterator.next().getData().iterator(); bytesIterator = recordIterator.next().getData().iterator();
} }
@ -152,6 +173,7 @@ public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, Reco
@Override @Override
public void close() public void close()
{ {
LOG.info("Closing entity iterator.");
closed = true; closed = true;
recordSupplier.close(); recordSupplier.close();
} }

View File

@ -106,7 +106,8 @@ public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
inputSource = new RecordSupplierInputSource<>( inputSource = new RecordSupplierInputSource<>(
ioConfig.getStream(), ioConfig.getStream(),
recordSupplier, recordSupplier,
ioConfig.isUseEarliestSequenceNumber() ioConfig.isUseEarliestSequenceNumber(),
samplerConfig.getTimeoutMs() <= 0 ? null : samplerConfig.getTimeoutMs()
); );
inputFormat = Preconditions.checkNotNull( inputFormat = Preconditions.checkNotNull(
ioConfig.getInputFormat(), ioConfig.getInputFormat(),
@ -173,7 +174,8 @@ public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, RecordType> inputSource = new RecordSupplierInputSource<>( RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, RecordType> inputSource = new RecordSupplierInputSource<>(
ioConfig.getStream(), ioConfig.getStream(),
createRecordSupplier(), createRecordSupplier(),
ioConfig.isUseEarliestSequenceNumber() ioConfig.isUseEarliestSequenceNumber(),
samplerConfig.getTimeoutMs() <= 0 ? null : samplerConfig.getTimeoutMs()
); );
this.entityIterator = inputSource.createEntityIterator(); this.entityIterator = inputSource.createEntityIterator();
} }

View File

@ -1230,7 +1230,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
); );
SamplerResponse response = inputSourceSampler.sample( SamplerResponse response = inputSourceSampler.sample(
new RecordSupplierInputSource("topicName", new TestRecordSupplier(jsonBlockList), true), new RecordSupplierInputSource("topicName", new TestRecordSupplier(jsonBlockList), true, 3000),
createInputFormat(), createInputFormat(),
dataSchema, dataSchema,
new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to speed up*/, null, null) new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to speed up*/, null, null)

View File

@ -72,7 +72,7 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
public void testRead() throws IOException public void testRead() throws IOException
{ {
final RandomCsvSupplier supplier = new RandomCsvSupplier(); final RandomCsvSupplier supplier = new RandomCsvSupplier();
final InputSource inputSource = new RecordSupplierInputSource<>("topic", supplier, false); final InputSource inputSource = new RecordSupplierInputSource<>("topic", supplier, false, null);
final List<String> colNames = IntStream.range(0, NUM_COLS) final List<String> colNames = IntStream.range(0, NUM_COLS)
.mapToObj(i -> StringUtils.format("col_%d", i)) .mapToObj(i -> StringUtils.format("col_%d", i))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -100,6 +100,35 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
Assert.assertTrue(supplier.isClosed()); Assert.assertTrue(supplier.isClosed());
} }
@Test
public void testReadTimeout() throws IOException
{
final RandomCsvSupplier supplier = new RandomCsvSupplier();
final InputSource inputSource = new RecordSupplierInputSource<>("topic", supplier, false, -1000);
final List<String> colNames = IntStream.range(0, NUM_COLS)
.mapToObj(i -> StringUtils.format("col_%d", i))
.collect(Collectors.toList());
final InputFormat inputFormat = new CsvInputFormat(colNames, null, null, false, 0);
final InputSourceReader reader = inputSource.reader(
new InputRowSchema(
new TimestampSpec("col_0", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(colNames.subList(1, colNames.size()))),
ColumnsFilter.all()
),
inputFormat,
temporaryFolder.newFolder()
);
int read = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
for (; read < NUM_ROWS && iterator.hasNext(); read++) {
iterator.next();
}
}
Assert.assertEquals(0, read);
Assert.assertTrue(supplier.isClosed());
}
private static class RandomCsvSupplier implements RecordSupplier<Integer, Integer, ByteEntity> private static class RandomCsvSupplier implements RecordSupplier<Integer, Integer, ByteEntity>
{ {
private static final int STR_LEN = 8; private static final int STR_LEN = 8;