NIFI-12: Remove Processors even if their @OnRemoved methods throw Exceptions

This commit is contained in:
Mark Payne 2014-12-10 12:49:31 -05:00
parent f60a97b026
commit cb2e855fc7
2 changed files with 17 additions and 9 deletions

View File

@ -46,6 +46,7 @@ import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.annotation.OnRemoved;
import org.apache.nifi.processor.annotation.OnShutdown;
@ -53,7 +54,6 @@ import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
@ -664,7 +664,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor);
ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
} catch (final Exception e) {
throw new ProcessorLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e);
}

View File

@ -76,7 +76,6 @@ import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2;
import org.apache.nifi.util.FlowFilePackagerV3;
import org.apache.nifi.util.ObjectHolder;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
@ -317,6 +316,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
return Files.readAllBytes(Paths.get(filename));
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
int binsAdded = binFlowFiles(context, sessionFactory);
@ -332,6 +332,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
}
}
private int migrateBins(final ProcessContext context) {
int added = 0;
for (final Bin bin : binManager.removeReadyBins(true)) {
@ -548,20 +549,27 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
public void onScheduled(final ProcessContext context) throws IOException {
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
if (context.getProperty(MAX_BIN_AGE).getValue() != null) {
if (context.getProperty(MAX_BIN_AGE).isSet() ) {
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
} else {
binManager.setMaxBinAge(Integer.MAX_VALUE);
}
if (context.getProperty(MAX_SIZE).getValue() != null) {
if ( context.getProperty(MAX_SIZE).isSet() ) {
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
} else {
binManager.setMaximumSize(Long.MAX_VALUE);
}
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
} else {
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
if (context.getProperty(MAX_ENTRIES).getValue() != null) {
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger());
if ( context.getProperty(MAX_ENTRIES).isSet() ) {
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
} else {
binManager.setMaximumEntries(Integer.MAX_VALUE);
}
}