- Capture Exception to prevent failed evaluations from yielding processor
- Further capture evaluation exceptions as per PR feedback
- Adjust jUnit to new exception behavior
- This closes #1644
This commit is contained in:
Andre F de Miranda 2017-04-01 19:12:58 +11:00 committed by Matt Gilman
parent f87d2a2f57
commit 451a88df43
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 32 additions and 7 deletions

View File

@ -579,8 +579,9 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
try { try {
// evaluate the expression for the given flow file // evaluate the expression for the given flow file
return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).asBoolean(); return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).asBoolean();
} catch (final ProcessException pe) { } catch (final Exception e) {
throw new ProcessException(String.format("Unable to evaluate condition '%s': %s.", condition.getExpression(), pe), pe); getLogger().error(String.format("Could not evaluate the condition '%s' while processing Flowfile '%s'", condition.getExpression(), flowfile));
throw new ProcessException(String.format("Unable to evaluate condition '%s': %s.", condition.getExpression(), e), e);
} }
} }
@ -643,8 +644,9 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
// No point in updating if they will be removed // No point in updating if they will be removed
attributesToUpdate.keySet().removeAll(attributesToDelete); attributesToUpdate.keySet().removeAll(attributesToDelete);
} }
} catch (final ProcessException pe) { } catch (final Exception e) {
throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", attribute, pe), pe); logger.error(String.format("Unable to delete attribute '%s' while processing FlowFile '%s' .", attribute, flowfile));
throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", attribute, e), e);
} }
} else { } else {
boolean notDeleted = !attributesToDelete.contains(attribute); boolean notDeleted = !attributesToDelete.contains(attribute);
@ -667,8 +669,11 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
if (notDeleted) { if (notDeleted) {
attributesToUpdate.put(attribute, newAttributeValue); attributesToUpdate.put(attribute, newAttributeValue);
} }
} catch (final ProcessException pe) { // Capture Exception thrown when evaluating the Expression Language
throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", attribute, pe), pe); } catch (final Exception e) {
logger.error(String.format("Could not evaluate the FlowFile '%s' against expression '%s' " +
"defined by DynamicProperty '%s' due to '%s'", flowfile, action.getValue(), attribute, e.getLocalizedMessage()));
throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", attribute, e), e);
} }
} }
} }

View File

@ -29,6 +29,7 @@ import java.util.regex.PatternSyntaxException;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.attributes.UpdateAttribute; import org.apache.nifi.processors.attributes.UpdateAttribute;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.update.attributes.serde.CriteriaSerDe; import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
@ -36,6 +37,7 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.apache.nifi.processors.attributes.UpdateAttribute.STORE_STATE_LOCALLY; import static org.apache.nifi.processors.attributes.UpdateAttribute.STORE_STATE_LOCALLY;
@ -983,7 +985,25 @@ public class TestUpdateAttribute {
try { try {
runner.run(); runner.run();
} catch (Throwable t) { } catch (Throwable t) {
assertEquals(t.getCause().getClass(), PatternSyntaxException.class); assertEquals(ProcessException.class, t.getCause().getClass());
} }
} }
@Test
public void testDataIsTooShort() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("attribute.1", "${test:substring(1, 20)}");
runner.assertValid();
final Map<String, String> attributes = new HashMap<>();
attributes.put("test", "chocolate");
runner.enqueue(new byte[0], attributes);
try {
runner.run();
} catch (AssertionError e) {
Assert.assertTrue(e.getMessage().contains("org.apache.nifi.processor.exception.ProcessException"));
}
}
} }