diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index dd712d9db56..4420d2f3434 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -35,6 +35,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; import org.apache.druid.indexing.overlord.sampler.SamplerConfig; +import org.apache.druid.indexing.overlord.sampler.SamplerException; import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -51,7 +52,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.Arrays; import java.util.Iterator; @@ -59,6 +62,9 @@ import java.util.List; public class KafkaSamplerSpecTest extends InitializedNullHandlingTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); private static final String TOPIC = "sampling"; private static final DataSchema DATA_SCHEMA = new DataSchema( @@ -288,4 +294,56 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest throw new RuntimeException(e); } } + + @Test + public void testInvalidKafkaConfig() + { + KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec( + null, + DATA_SCHEMA, + null, + new KafkaSupervisorIOConfig( + TOPIC, + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + null, + null, + null, + + // invalid bootstrap server + ImmutableMap.of("bootstrap.servers", "127.0.0.1"), + + null, + null, + null, + null, + true, + null, + null, + null, + null + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( + supervisorSpec, + new SamplerConfig(5, null), + new InputSourceSampler(), + OBJECT_MAPPER + ); + + expectedException.expect(SamplerException.class); + expectedException.expectMessage("Invalid url in bootstrap.servers"); + samplerSpec.sample(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java index b7937b87574..e1f903c0194 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java @@ -21,6 +21,9 @@ package org.apache.druid.indexing.overlord.sampler; import org.apache.druid.java.util.common.StringUtils; +/** + * This exception will be mapped to a JSON object that will be returned to the client by {@link SamplerExceptionMapper} + */ public class SamplerException extends RuntimeException { public SamplerException(Throwable cause, String formatText, Object... arguments) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java index d35e5ab5066..bade3e5be72 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord.sampler; import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.logger.Logger; import javax.ws.rs.core.Response; import javax.ws.rs.ext.ExceptionMapper; @@ -28,13 +29,20 @@ import javax.ws.rs.ext.Provider; @Provider public class SamplerExceptionMapper implements ExceptionMapper { + private static final Logger LOG = new Logger(SamplerExceptionMapper.class); + @Override public Response toResponse(SamplerException exception) { + String message = exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage(); + + // Logging the stack trace and returning the exception message in the response + LOG.error(exception, message); + return Response.status(Response.Status.BAD_REQUEST) .entity(ImmutableMap.of( "error", - exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage() + message )) .build(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java index 106098bc525..3be7a4feb19 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.seekablestream; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import org.apache.druid.client.indexing.SamplerResponse; import org.apache.druid.client.indexing.SamplerSpec; import org.apache.druid.data.input.ByteBufferInputRowParser; @@ -38,6 +39,7 @@ import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; import org.apache.druid.indexing.overlord.sampler.SamplerConfig; +import org.apache.druid.indexing.overlord.sampler.SamplerException; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; @@ -92,9 +94,18 @@ public abstract class SeekableStreamSamplerSpec recordSupplier; + + try { + recordSupplier = createRecordSupplier(); + } + catch (Exception e) { + throw new SamplerException(e, "Unable to create RecordSupplier: %s", Throwables.getRootCause(e).getMessage()); + } + inputSource = new RecordSupplierInputSource<>( ioConfig.getStream(), - createRecordSupplier(), + recordSupplier, ioConfig.isUseEarliestSequenceNumber() ); inputFormat = Preconditions.checkNotNull(