mirror of https://github.com/apache/druid.git
Improve exception message when loading data from web-console (#11723)
* Improve exception handling * Revert some changes * Resolve comments * Update indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java Co-authored-by: Karan Kumar <karankumar1100@gmail.com> * Update indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java Co-authored-by: Karan Kumar <karankumar1100@gmail.com> * Address review comments Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
This commit is contained in:
parent
4631a66723
commit
c2cea25a6b
|
@ -35,6 +35,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
|
|||
import org.apache.druid.indexing.kafka.test.TestBroker;
|
||||
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerException;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
|
@ -51,7 +52,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
|
@ -59,6 +62,9 @@ import java.util.List;
|
|||
|
||||
public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
|
||||
{
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
|
||||
private static final String TOPIC = "sampling";
|
||||
private static final DataSchema DATA_SCHEMA = new DataSchema(
|
||||
|
@ -288,4 +294,56 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidKafkaConfig()
|
||||
{
|
||||
KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
|
||||
null,
|
||||
DATA_SCHEMA,
|
||||
null,
|
||||
new KafkaSupervisorIOConfig(
|
||||
TOPIC,
|
||||
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
||||
// invalid bootstrap server
|
||||
ImmutableMap.of("bootstrap.servers", "127.0.0.1"),
|
||||
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
true,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
|
||||
supervisorSpec,
|
||||
new SamplerConfig(5, null),
|
||||
new InputSourceSampler(),
|
||||
OBJECT_MAPPER
|
||||
);
|
||||
|
||||
expectedException.expect(SamplerException.class);
|
||||
expectedException.expectMessage("Invalid url in bootstrap.servers");
|
||||
samplerSpec.sample();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,9 @@ package org.apache.druid.indexing.overlord.sampler;
|
|||
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
/**
|
||||
* This exception will be mapped to a JSON object that will be returned to the client by {@link SamplerExceptionMapper}
|
||||
*/
|
||||
public class SamplerException extends RuntimeException
|
||||
{
|
||||
public SamplerException(Throwable cause, String formatText, Object... arguments)
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.indexing.overlord.sampler;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.ext.ExceptionMapper;
|
||||
|
@ -28,13 +29,20 @@ import javax.ws.rs.ext.Provider;
|
|||
@Provider
|
||||
public class SamplerExceptionMapper implements ExceptionMapper<SamplerException>
|
||||
{
|
||||
private static final Logger LOG = new Logger(SamplerExceptionMapper.class);
|
||||
|
||||
@Override
|
||||
public Response toResponse(SamplerException exception)
|
||||
{
|
||||
String message = exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage();
|
||||
|
||||
// Logging the stack trace and returning the exception message in the response
|
||||
LOG.error(exception, message);
|
||||
|
||||
return Response.status(Response.Status.BAD_REQUEST)
|
||||
.entity(ImmutableMap.of(
|
||||
"error",
|
||||
exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage()
|
||||
message
|
||||
))
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.indexing.seekablestream;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import org.apache.druid.client.indexing.SamplerResponse;
|
||||
import org.apache.druid.client.indexing.SamplerSpec;
|
||||
import org.apache.druid.data.input.ByteBufferInputRowParser;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.druid.data.input.impl.InputRowParser;
|
|||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerException;
|
||||
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
|
||||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
|
||||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
|
||||
|
@ -92,9 +94,18 @@ public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
|
|||
);
|
||||
inputFormat = null;
|
||||
} else {
|
||||
RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier;
|
||||
|
||||
try {
|
||||
recordSupplier = createRecordSupplier();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new SamplerException(e, "Unable to create RecordSupplier: %s", Throwables.getRootCause(e).getMessage());
|
||||
}
|
||||
|
||||
inputSource = new RecordSupplierInputSource<>(
|
||||
ioConfig.getStream(),
|
||||
createRecordSupplier(),
|
||||
recordSupplier,
|
||||
ioConfig.isUseEarliestSequenceNumber()
|
||||
);
|
||||
inputFormat = Preconditions.checkNotNull(
|
||||
|
|
Loading…
Reference in New Issue