From cb2e855fc7c42536887b055baadf93683d764a47 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 10 Dec 2014 12:49:31 -0500 Subject: [PATCH] NIFI-12: Remove Processors even if their @OnRemoved methods throw Exceptions --- .../nifi/groups/StandardProcessGroup.java | 4 ++-- .../processors/standard/MergeContent.java | 22 +++++++++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 8aafb58a96..1064536ad8 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -46,6 +46,7 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.annotation.OnRemoved; import org.apache.nifi.processor.annotation.OnShutdown; @@ -53,7 +54,6 @@ import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -664,7 +664,7 @@ public final class StandardProcessGroup implements ProcessGroup { try (final NarCloseable x = NarCloseable.withNarLoader()) { final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor); - ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); } catch (final Exception e) { throw new ProcessorLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e); } diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index d443e00481..9a932f0954 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -76,7 +76,6 @@ import org.apache.nifi.util.FlowFilePackagerV1; import org.apache.nifi.util.FlowFilePackagerV2; import org.apache.nifi.util.FlowFilePackagerV3; import org.apache.nifi.util.ObjectHolder; - import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; @@ -317,6 +316,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor { return Files.readAllBytes(Paths.get(filename)); } + @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { int binsAdded = binFlowFiles(context, sessionFactory); @@ -331,6 +331,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor { context.yield(); } } + private int migrateBins(final ProcessContext context) { int added = 0; @@ -548,20 +549,27 @@ public class MergeContent extends AbstractSessionFactoryProcessor { public void onScheduled(final ProcessContext context) throws IOException { binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue()); - if (context.getProperty(MAX_BIN_AGE).getValue() != null) { + if (context.getProperty(MAX_BIN_AGE).isSet() ) { binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue()); + } else { + binManager.setMaxBinAge(Integer.MAX_VALUE); } - - if (context.getProperty(MAX_SIZE).getValue() != null) { + + if ( context.getProperty(MAX_SIZE).isSet() ) { binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue()); + } else { + binManager.setMaximumSize(Long.MAX_VALUE); } - + if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE); } else { binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger()); - if (context.getProperty(MAX_ENTRIES).getValue() != null) { - binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger()); + + if ( context.getProperty(MAX_ENTRIES).isSet() ) { + binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue()); + } else { + binManager.setMaximumEntries(Integer.MAX_VALUE); } }