From d09d58ddf2e0eed7164ff92ed41fb4a76bd30aed Mon Sep 17 00:00:00 2001 From: bbende Date: Fri, 17 Apr 2015 18:25:23 -0400 Subject: [PATCH] Improving error handling to detect when a SolrServerException is caused by an IOException --- .../processors/solr/PutSolrContentStream.java | 27 +++++++++++--- .../nifi/processors/solr/SolrProcessor.java | 4 +-- .../solr/TestPutSolrContentStream.java | 36 +++++++++++++++++-- 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java index 89b510c401..52a3d904cd 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -149,8 +149,8 @@ public class PutSolrContentStream extends SolrProcessor { return; } - final ObjectHolder error = new ObjectHolder<>(null); - final ObjectHolder connectionError = new ObjectHolder<>(null); + final ObjectHolder error = new ObjectHolder<>(null); + final ObjectHolder connectionError = new ObjectHolder<>(null); final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue()); final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue(); @@ -205,6 +205,12 @@ public class PutSolrContentStream extends SolrProcessor { } catch (SolrException e) { error.set(e); } catch (SolrServerException e) { + if (causedByIOException(e)) { + connectionError.set(e); + } else { + error.set(e); + } + } catch (IOException e) { connectionError.set(e); } } @@ -212,8 +218,8 @@ public class PutSolrContentStream extends SolrProcessor { timer.stop(); if (error.get() != null) { - getLogger().error("Failed to send {} to Solr due to {} with status code {}; routing to failure", - new Object[]{flowFile, error.get(), error.get().code()}); + getLogger().error("Failed to send {} to Solr due to {}; routing to failure", + new Object[]{flowFile, error.get()}); session.transfer(flowFile, REL_FAILURE); } else if (connectionError.get() != null) { getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure", @@ -234,6 +240,19 @@ public class PutSolrContentStream extends SolrProcessor { } } + private boolean causedByIOException(SolrServerException e) { + boolean foundIOException = false; + Throwable cause = e.getCause(); + while (cause != null) { + if (cause instanceof IOException) { + foundIOException = true; + break; + } + cause = cause.getCause(); + } + return foundIOException; + } + // get all of the dynamic properties and values into a Map for later adding to the Solr request private Map getRequestParams(ProcessContext context, FlowFile flowFile) { final Map paramsMap = new HashMap<>(); diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java index 9cad214b76..27f208aa3d 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java @@ -57,8 +57,8 @@ public abstract class SolrProcessor extends AbstractProcessor { public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor .Builder().name("Solr Location") - .description("The Solr url for a Solr Type of Standard, " + - "or the ZooKeeper hosts for a Solr Type of Cloud.") + .description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " + + "or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java index 4ce6928f19..56fbfcb7b8 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java @@ -173,8 +173,24 @@ public class TestPutSolrContentStream { } @Test - public void testSolrServerExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException { - final Throwable throwable = new SolrServerException("Error communicating with Solr"); + public void testSolrServerExceptionShouldRouteToFailure() throws IOException, SolrServerException { + final Throwable throwable = new SolrServerException("Invalid Document"); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null)); + } + } + + @Test + public void testSolrServerExceptionCausedByIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException { + final Throwable throwable = new SolrServerException(new IOException("Error communicating with Solr")); final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); final TestRunner runner = createDefaultTestRunner(proc); @@ -221,6 +237,22 @@ public class TestPutSolrContentStream { } } + @Test + public void testIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException { + final Throwable throwable = new IOException("Error communicating with Solr"); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null)); + } + } + @Test public void testSolrTypeCloudShouldRequireCollection() { final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);