From 2fdaa2fcabc7ceb91568ce1e6b1fcede2da7602c Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Thu, 3 Nov 2022 17:45:35 -0500 Subject: [PATCH] Make RecordSupplierInputSource respect sampler timeout when stream is empty (#13296) * Make RecordSupplierInputSource respect sampler timeout when stream is empty * Rename timeout param, make it nullable, add timeout test --- .../RecordSupplierInputSource.java | 24 +++++++++++++- .../SeekableStreamSamplerSpec.java | 6 ++-- .../sampler/InputSourceSamplerTest.java | 2 +- .../RecordSupplierInputSourceTest.java | 31 ++++++++++++++++++- 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java index c3875715078..ee54f2ac221 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java @@ -31,6 +31,7 @@ import org.apache.druid.indexing.overlord.sampler.SamplerException; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; import javax.annotation.Nullable; @@ -45,19 +46,28 @@ import java.util.stream.Collectors; */ public class RecordSupplierInputSource extends AbstractInputSource { + private static final Logger LOG = new Logger(RecordSupplierInputSource.class); + private final String topic; private final RecordSupplier recordSupplier; private final boolean useEarliestOffset; + /** + * Maximum amount of time in which the entity iterator will return results. If null, no timeout is applied. + */ + private final Integer iteratorTimeoutMs; + public RecordSupplierInputSource( String topic, RecordSupplier recordSupplier, - boolean useEarliestOffset + boolean useEarliestOffset, + Integer iteratorTimeoutMs ) { this.topic = topic; this.recordSupplier = recordSupplier; this.useEarliestOffset = useEarliestOffset; + this.iteratorTimeoutMs = iteratorTimeoutMs; try { assignAndSeek(recordSupplier); } @@ -123,13 +133,24 @@ public class RecordSupplierInputSource> recordIterator; private Iterator bytesIterator; private volatile boolean closed; + private final long createTime = System.currentTimeMillis(); + private final Long terminationTime = iteratorTimeoutMs != null ? createTime + iteratorTimeoutMs : null; private void waitNextIteratorIfNecessary() { while (!closed && (bytesIterator == null || !bytesIterator.hasNext())) { while (!closed && (recordIterator == null || !recordIterator.hasNext())) { + if (terminationTime != null && System.currentTimeMillis() > terminationTime) { + LOG.info( + "Configured sampler timeout [%s] has been exceeded, returning without a bytesIterator.", + iteratorTimeoutMs + ); + bytesIterator = null; + return; + } recordIterator = recordSupplier.poll(SeekableStreamSamplerSpec.POLL_TIMEOUT_MS).iterator(); } + if (!closed) { bytesIterator = recordIterator.next().getData().iterator(); } @@ -152,6 +173,7 @@ public class RecordSupplierInputSource( ioConfig.getStream(), recordSupplier, - ioConfig.isUseEarliestSequenceNumber() + ioConfig.isUseEarliestSequenceNumber(), + samplerConfig.getTimeoutMs() <= 0 ? null : samplerConfig.getTimeoutMs() ); inputFormat = Preconditions.checkNotNull( ioConfig.getInputFormat(), @@ -173,7 +174,8 @@ public abstract class SeekableStreamSamplerSpec inputSource = new RecordSupplierInputSource<>( ioConfig.getStream(), createRecordSupplier(), - ioConfig.isUseEarliestSequenceNumber() + ioConfig.isUseEarliestSequenceNumber(), + samplerConfig.getTimeoutMs() <= 0 ? null : samplerConfig.getTimeoutMs() ); this.entityIterator = inputSource.createEntityIterator(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index af612572b5b..cbd58d1adfb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -1230,7 +1230,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest ); SamplerResponse response = inputSourceSampler.sample( - new RecordSupplierInputSource("topicName", new TestRecordSupplier(jsonBlockList), true), + new RecordSupplierInputSource("topicName", new TestRecordSupplier(jsonBlockList), true, 3000), createInputFormat(), dataSchema, new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to speed up*/, null, null) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java index eb3a0835274..34464030727 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java @@ -72,7 +72,7 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest public void testRead() throws IOException { final RandomCsvSupplier supplier = new RandomCsvSupplier(); - final InputSource inputSource = new RecordSupplierInputSource<>("topic", supplier, false); + final InputSource inputSource = new RecordSupplierInputSource<>("topic", supplier, false, null); final List colNames = IntStream.range(0, NUM_COLS) .mapToObj(i -> StringUtils.format("col_%d", i)) .collect(Collectors.toList()); @@ -100,6 +100,35 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest Assert.assertTrue(supplier.isClosed()); } + @Test + public void testReadTimeout() throws IOException + { + final RandomCsvSupplier supplier = new RandomCsvSupplier(); + final InputSource inputSource = new RecordSupplierInputSource<>("topic", supplier, false, -1000); + final List colNames = IntStream.range(0, NUM_COLS) + .mapToObj(i -> StringUtils.format("col_%d", i)) + .collect(Collectors.toList()); + final InputFormat inputFormat = new CsvInputFormat(colNames, null, null, false, 0); + final InputSourceReader reader = inputSource.reader( + new InputRowSchema( + new TimestampSpec("col_0", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(colNames.subList(1, colNames.size()))), + ColumnsFilter.all() + ), + inputFormat, + temporaryFolder.newFolder() + ); + + int read = 0; + try (CloseableIterator iterator = reader.read()) { + for (; read < NUM_ROWS && iterator.hasNext(); read++) { + iterator.next(); + } + } + Assert.assertEquals(0, read); + Assert.assertTrue(supplier.isClosed()); + } + private static class RandomCsvSupplier implements RecordSupplier { private static final int STR_LEN = 8;