Handle all types of exceptions when initializing input source in sampler API (#14355)

The sampler API returns a `400 bad request` response if it encounters a `SamplerException`.
Otherwise, it returns a generic `500 Internal server error` response, with the message
"The RuntimeException could not be mapped to a response, re-throwing to the HTTP container".

This commit updates `RecordSupplierInputSource` to handle all types of exceptions instead of just
`InterruptedException`and wrap them in a `SamplerException` so that the actual error is
propagated back to the user.
This commit is contained in:
Andreas Maechler 2023-06-02 08:13:53 -06:00 committed by GitHub
parent b482fda503
commit 45014bd5b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 20 deletions

View File

@ -55,7 +55,7 @@ public class SamplerConfig
/**
* The maximum number of rows to return in a response. The actual number of returned rows may be less if:
* - The sampled source contains less data.
* - The sampled source contains fewer rows
* - {@link SamplerConfig#timeoutMs} elapses before this value is reached
* - {@link org.apache.druid.segment.indexing.granularity.GranularitySpec#isRollup()} is true and input rows get
* rolled-up into fewer indexed rows.
@ -85,7 +85,7 @@ public class SamplerConfig
/**
* Maximum number of bytes in memory that the {@link org.apache.druid.segment.incremental.IncrementalIndex} used by
* {@link InputSourceSampler#sample(org.apache.druid.data.input.InputSource, org.apache.druid.data.input.InputFormat, org.apache.druid.segment.indexing.DataSchema, SamplerConfig})
* {@link InputSourceSampler#sample(org.apache.druid.data.input.InputSource, org.apache.druid.data.input.InputFormat, org.apache.druid.segment.indexing.DataSchema, SamplerConfig)}
* will be allowed to accumulate before aborting sampling. Particularly useful for limiting footprint of sample
* operations as well as overall response size from sample requests. However, it is not directly correlated to
* response size since it also contains the "raw" input data, so actual responses will likely be at least twice the

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.seekablestream;
import com.google.common.base.Throwables;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
@ -68,29 +69,35 @@ public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, Reco
this.recordSupplier = recordSupplier;
this.useEarliestOffset = useEarliestOffset;
this.iteratorTimeoutMs = iteratorTimeoutMs;
try {
assignAndSeek(recordSupplier);
}
catch (InterruptedException e) {
throw new SamplerException(e, "Exception while seeking to partitions");
}
assignAndSeek(recordSupplier);
}
private void assignAndSeek(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier)
throws InterruptedException
{
final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
.getPartitionIds(topic)
.stream()
.map(partitionId -> StreamPartition.of(topic, partitionId))
.collect(Collectors.toSet());
try {
final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
.getPartitionIds(topic)
.stream()
.map(partitionId -> StreamPartition.of(topic, partitionId))
.collect(Collectors.toSet());
recordSupplier.assign(partitions);
recordSupplier.assign(partitions);
if (useEarliestOffset) {
recordSupplier.seekToEarliest(partitions);
} else {
recordSupplier.seekToLatest(partitions);
if (useEarliestOffset) {
recordSupplier.seekToEarliest(partitions);
} else {
recordSupplier.seekToLatest(partitions);
}
}
catch (Exception e) {
throw new SamplerException(
e,
"Exception while seeking to the [%s] offset of partitions in topic [%s]: %s",
useEarliestOffset ? "earliest" : "latest",
topic,
Throwables.getRootCause(e).getMessage()
);
}
}

View File

@ -33,9 +33,11 @@ import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.overlord.sampler.SamplerException;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
@ -45,6 +47,7 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
@ -62,7 +65,6 @@ import java.util.stream.IntStream;
public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
{
private static final int NUM_COLS = 16;
private static final int NUM_ROWS = 128;
private static final String TIMESTAMP_STRING = "2019-01-01";
@ -135,6 +137,24 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
Assert.assertTrue(supplier.isClosed());
}
@Test
public void testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
{
final RecordSupplier<?, ?, ?> supplier = Mockito.mock(RecordSupplier.class);
Mockito.when(supplier.getPartitionIds("test-stream"))
.thenThrow(new StreamException(new Exception("Something bad happened")));
//noinspection ResultOfObjectAllocationIgnored
final SamplerException exception = Assert.assertThrows(
SamplerException.class,
() -> new RecordSupplierInputSource<>("test-stream", supplier, false, null)
);
Assert.assertEquals(
"Exception while seeking to the [latest] offset of partitions in topic [test-stream]: Something bad happened",
exception.getMessage()
);
}
private static class RandomCsvSupplier implements RecordSupplier<Integer, Integer, ByteEntity>
{
private static final int STR_LEN = 8;