mirror of https://github.com/apache/nifi.git
Improving error handling to detect when a SolrServerException is caused by an IOException
This commit is contained in:
parent
e9402a0ddc
commit
d09d58ddf2
|
@ -149,8 +149,8 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||
return;
|
||||
}
|
||||
|
||||
final ObjectHolder<SolrException> error = new ObjectHolder<>(null);
|
||||
final ObjectHolder<SolrServerException> connectionError = new ObjectHolder<>(null);
|
||||
final ObjectHolder<Exception> error = new ObjectHolder<>(null);
|
||||
final ObjectHolder<Exception> connectionError = new ObjectHolder<>(null);
|
||||
|
||||
final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
|
||||
final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
@ -205,6 +205,12 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||
} catch (SolrException e) {
|
||||
error.set(e);
|
||||
} catch (SolrServerException e) {
|
||||
if (causedByIOException(e)) {
|
||||
connectionError.set(e);
|
||||
} else {
|
||||
error.set(e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
connectionError.set(e);
|
||||
}
|
||||
}
|
||||
|
@ -212,8 +218,8 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||
timer.stop();
|
||||
|
||||
if (error.get() != null) {
|
||||
getLogger().error("Failed to send {} to Solr due to {} with status code {}; routing to failure",
|
||||
new Object[]{flowFile, error.get(), error.get().code()});
|
||||
getLogger().error("Failed to send {} to Solr due to {}; routing to failure",
|
||||
new Object[]{flowFile, error.get()});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} else if (connectionError.get() != null) {
|
||||
getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure",
|
||||
|
@ -234,6 +240,19 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean causedByIOException(SolrServerException e) {
|
||||
boolean foundIOException = false;
|
||||
Throwable cause = e.getCause();
|
||||
while (cause != null) {
|
||||
if (cause instanceof IOException) {
|
||||
foundIOException = true;
|
||||
break;
|
||||
}
|
||||
cause = cause.getCause();
|
||||
}
|
||||
return foundIOException;
|
||||
}
|
||||
|
||||
// get all of the dynamic properties and values into a Map for later adding to the Solr request
|
||||
private Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) {
|
||||
final Map<String,String[]> paramsMap = new HashMap<>();
|
||||
|
|
|
@ -57,8 +57,8 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
|
||||
public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor
|
||||
.Builder().name("Solr Location")
|
||||
.description("The Solr url for a Solr Type of Standard, " +
|
||||
"or the ZooKeeper hosts for a Solr Type of Cloud.")
|
||||
.description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " +
|
||||
"or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
|
|
@ -173,8 +173,24 @@ public class TestPutSolrContentStream {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSolrServerExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException {
|
||||
final Throwable throwable = new SolrServerException("Error communicating with Solr");
|
||||
public void testSolrServerExceptionShouldRouteToFailure() throws IOException, SolrServerException {
|
||||
final Throwable throwable = new SolrServerException("Invalid Document");
|
||||
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
|
||||
|
||||
final TestRunner runner = createDefaultTestRunner(proc);
|
||||
|
||||
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
|
||||
runner.enqueue(fileIn);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
|
||||
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSolrServerExceptionCausedByIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException {
|
||||
final Throwable throwable = new SolrServerException(new IOException("Error communicating with Solr"));
|
||||
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
|
||||
|
||||
final TestRunner runner = createDefaultTestRunner(proc);
|
||||
|
@ -221,6 +237,22 @@ public class TestPutSolrContentStream {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException {
|
||||
final Throwable throwable = new IOException("Error communicating with Solr");
|
||||
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
|
||||
|
||||
final TestRunner runner = createDefaultTestRunner(proc);
|
||||
|
||||
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
|
||||
runner.enqueue(fileIn);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1);
|
||||
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSolrTypeCloudShouldRequireCollection() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
|
||||
|
|
Loading…
Reference in New Issue