From bd9782cb43ece682c853d9a75279ebe76f968044 Mon Sep 17 00:00:00 2001 From: danbress Date: Sat, 27 Jun 2015 14:38:45 -0400 Subject: [PATCH 1/3] NIFI-735: DocGenerator should on @OnUnscheduled and @OnStopped after its done generating the docs --- .../ConfigurableComponentInitializer.java | 9 ++++++++- .../org/apache/nifi/documentation/DocGenerator.java | 2 ++ .../init/ControllerServiceInitializer.java | 11 +++++++++-- .../nifi/documentation/init/ProcessorInitializer.java | 11 +++++++++-- .../init/ReportingTaskingInitializer.java | 11 +++++++++-- .../html/HtmlDocumentationWriterTest.java | 2 ++ .../html/ProcessorDocumentationWriterTest.java | 3 +++ 7 files changed, 42 insertions(+), 7 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/ConfigurableComponentInitializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/ConfigurableComponentInitializer.java index bd07ab54d8..ad21f2124a 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/ConfigurableComponentInitializer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/ConfigurableComponentInitializer.java @@ -20,7 +20,7 @@ import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.reporting.InitializationException; /** - * An interface for initializing a ConfigurableComponent. It is up to the + * An interface for initializing and tearing down a ConfigurableComponent. It is up to the * implementer to call "init" so that you can call * ConfigurableComponent.getPropertyDescriptors() * @@ -35,4 +35,11 @@ public interface ConfigurableComponentInitializer { * @throws InitializationException if the component could not be initialized */ void initialize(ConfigurableComponent component) throws InitializationException; + + /** + * Calls the lifecycle methods that should be called when a flow is shutdown. + * + * @param component the component to initialize + */ + void teardown(ConfigurableComponent component); } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/DocGenerator.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/DocGenerator.java index 7c32bf8004..327da045f2 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/DocGenerator.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/DocGenerator.java @@ -114,6 +114,8 @@ public class DocGenerator { try (final OutputStream output = new BufferedOutputStream(new FileOutputStream(baseDocumenationFile))) { writer.write(component, output, hasAdditionalInfo(directory)); } + + initializer.teardown(component); } /** diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java index 976c7a5ea4..7839254977 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java @@ -42,11 +42,18 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { controllerService.initialize(new MockControllerServiceInitializationContext()); + } + } + + @Override + public void teardown(ConfigurableComponent component) { + try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { + ControllerService controllerService = (ControllerService) component; final ProcessorLog logger = new MockProcessorLogger(); final MockConfigurationContext context = new MockConfigurationContext(); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, null, controllerService, logger, context); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, null, controllerService, logger, context); + ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, controllerService, logger, context); + ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, controllerService, logger, context); } } } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java index 7f3357c30f..5808c3558b 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java @@ -40,11 +40,18 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer { Processor processor = (Processor) component; try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { processor.initialize(new MockProcessorInitializationContext()); + } + } + + @Override + public void teardown(ConfigurableComponent component) { + Processor processor = (Processor) component; + try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { final ProcessorLog logger = new MockProcessorLogger(); final MockProcessContext context = new MockProcessContext(); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, null, processor, logger, context); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, null, processor, logger, context); + ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, processor, logger, context); + ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, processor, logger, context); } } } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java index 90642e4828..23b45009ff 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java @@ -40,10 +40,17 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial ReportingTask reportingTask = (ReportingTask) component; try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { reportingTask.initialize(new MockReportingInitializationContext()); + } + } + + @Override + public void teardown(ConfigurableComponent component) { + ReportingTask reportingTask = (ReportingTask) component; + try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { final MockConfigurationContext context = new MockConfigurationContext(); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, null, reportingTask, new MockProcessorLogger(), context); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, null, reportingTask, new MockProcessorLogger(), context); + ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, reportingTask, new MockProcessorLogger(), context); + ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, reportingTask, new MockProcessorLogger(), context); } } } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/HtmlDocumentationWriterTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/HtmlDocumentationWriterTest.java index d85e19b7e5..5a138dc785 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/HtmlDocumentationWriterTest.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/HtmlDocumentationWriterTest.java @@ -58,6 +58,7 @@ public class HtmlDocumentationWriterTest { ByteArrayOutputStream baos = new ByteArrayOutputStream(); writer.write(controllerService, baos, false); + initializer.teardown(controllerService); String results = new String(baos.toByteArray()); XmlValidator.assertXmlValid(results); @@ -96,6 +97,7 @@ public class HtmlDocumentationWriterTest { ByteArrayOutputStream baos = new ByteArrayOutputStream(); writer.write(reportingTask, baos, false); + initializer.teardown(reportingTask); String results = new String(baos.toByteArray()); XmlValidator.assertXmlValid(results); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java index fda81fd4e7..31fe73e477 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java @@ -44,6 +44,7 @@ public class ProcessorDocumentationWriterTest { ByteArrayOutputStream baos = new ByteArrayOutputStream(); writer.write(processor, baos, false); + initializer.teardown(processor); String results = new String(baos.toByteArray()); XmlValidator.assertXmlValid(results); @@ -92,6 +93,7 @@ public class ProcessorDocumentationWriterTest { ByteArrayOutputStream baos = new ByteArrayOutputStream(); writer.write(processor, baos, false); + initializer.teardown(processor); String results = new String(baos.toByteArray()); XmlValidator.assertXmlValid(results); @@ -121,6 +123,7 @@ public class ProcessorDocumentationWriterTest { ByteArrayOutputStream baos = new ByteArrayOutputStream(); writer.write(processor, baos, false); + initializer.teardown(processor); String results = new String(baos.toByteArray()); XmlValidator.assertXmlValid(results); From c769742d9b10145c26a0b6cd75b9ae75bbdc9589 Mon Sep 17 00:00:00 2001 From: danbress Date: Sun, 28 Jun 2015 16:42:52 -0400 Subject: [PATCH 2/3] NIFI-735: removing call to OnRemoved and updating unit tests --- .../documentation/init/ControllerServiceInitializer.java | 2 -- .../nifi/documentation/init/ProcessorInitializer.java | 2 -- .../documentation/init/ReportingTaskingInitializer.java | 2 -- .../documentation/html/HtmlDocumentationWriterTest.java | 8 ++++---- .../html/ProcessorDocumentationWriterTest.java | 4 ++-- 5 files changed, 6 insertions(+), 12 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java index 7839254977..441033c090 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.documentation.init; -import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.controller.ControllerService; @@ -52,7 +51,6 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia final ProcessorLog logger = new MockProcessorLogger(); final MockConfigurationContext context = new MockConfigurationContext(); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, controllerService, logger, context); ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, controllerService, logger, context); } } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java index 5808c3558b..7a0752d3b9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.documentation.init; -import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.documentation.ConfigurableComponentInitializer; @@ -50,7 +49,6 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer { final ProcessorLog logger = new MockProcessorLogger(); final MockProcessContext context = new MockProcessContext(); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, processor, logger, context); ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, processor, logger, context); } } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java index 23b45009ff..e9642e3cd9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.documentation.init; -import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.documentation.ConfigurableComponentInitializer; @@ -49,7 +48,6 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { final MockConfigurationContext context = new MockConfigurationContext(); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, reportingTask, new MockProcessorLogger(), context); ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, reportingTask, new MockProcessorLogger(), context); } } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/HtmlDocumentationWriterTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/HtmlDocumentationWriterTest.java index 5a138dc785..ae04f46b53 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/HtmlDocumentationWriterTest.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/HtmlDocumentationWriterTest.java @@ -78,8 +78,8 @@ public class HtmlDocumentationWriterTest { assertContains(results, "Sensitive Property: true"); // verify the right OnRemoved and OnShutdown methods were called - Assert.assertEquals(1, controllerService.getOnRemovedArgs()); - Assert.assertEquals(1, controllerService.getOnRemovedNoArgs()); + Assert.assertEquals(0, controllerService.getOnRemovedArgs()); + Assert.assertEquals(0, controllerService.getOnRemovedNoArgs()); Assert.assertEquals(1, controllerService.getOnShutdownArgs()); Assert.assertEquals(1, controllerService.getOnShutdownNoArgs()); @@ -115,8 +115,8 @@ public class HtmlDocumentationWriterTest { assertContains(results, "false"); // verify the right OnRemoved and OnShutdown methods were called - Assert.assertEquals(1, reportingTask.getOnRemovedArgs()); - Assert.assertEquals(1, reportingTask.getOnRemovedNoArgs()); + Assert.assertEquals(0, reportingTask.getOnRemovedArgs()); + Assert.assertEquals(0, reportingTask.getOnRemovedNoArgs()); Assert.assertEquals(1, reportingTask.getOnShutdownArgs()); Assert.assertEquals(1, reportingTask.getOnShutdownNoArgs()); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java index 31fe73e477..447e2e9ccd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java @@ -75,8 +75,8 @@ public class ProcessorDocumentationWriterTest { assertNotContains(results, "Additional Details..."); // verify the right OnRemoved and OnShutdown methods were called - Assert.assertEquals(1, processor.getOnRemovedArgs()); - Assert.assertEquals(1, processor.getOnRemovedNoArgs()); + Assert.assertEquals(0, processor.getOnRemovedArgs()); + Assert.assertEquals(0, processor.getOnRemovedNoArgs()); Assert.assertEquals(1, processor.getOnShutdownArgs()); Assert.assertEquals(1, processor.getOnShutdownNoArgs()); From 0d2842e4f9df2c1734c409d80511601765d8dc1b Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Sat, 4 Jul 2015 12:38:33 -0400 Subject: [PATCH 3/3] NIFI-750 adding a way to specify attribute names when constructing the spout --- nifi/nifi-external/nifi-storm-spout/pom.xml | 2 +- .../java/org/apache/nifi/storm/NiFiSpout.java | 38 ++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/nifi/nifi-external/nifi-storm-spout/pom.xml b/nifi/nifi-external/nifi-storm-spout/pom.xml index c55c698a43..353dd2d55f 100644 --- a/nifi/nifi-external/nifi-storm-spout/pom.xml +++ b/nifi/nifi-external/nifi-storm-spout/pom.xml @@ -27,7 +27,7 @@ org.apache.storm storm-core - 0.9.4 + 0.9.5 provided diff --git a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java index 50631231c2..64dac6f0a2 100644 --- a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java +++ b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java @@ -112,14 +112,35 @@ public class NiFiSpout extends BaseRichSpout { public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class); + public static final String NIFI_DATA_PACKET = "nifiDataPacket"; + private NiFiSpoutReceiver spoutReceiver; private LinkedBlockingQueue queue; private SpoutOutputCollector spoutOutputCollector; private final SiteToSiteClientConfig clientConfig; + private final List attributeNames; + /** + * @param clientConfig + * configuration used to build the SiteToSiteClient + */ public NiFiSpout(SiteToSiteClientConfig clientConfig) { + this(clientConfig, null); + } + + /** + * + * @param clientConfig + * configuration used to build the SiteToSiteClient + * @param attributeNames + * names of FlowFile attributes to be added as values to each tuple, in addition + * to the nifiDataPacket value on all tuples + * + */ + public NiFiSpout(SiteToSiteClientConfig clientConfig, List attributeNames) { this.clientConfig = clientConfig; + this.attributeNames = (attributeNames == null ? new ArrayList() : attributeNames); } @Override @@ -139,13 +160,26 @@ public class NiFiSpout extends BaseRichSpout { if (data == null) { Utils.sleep(50); } else { - spoutOutputCollector.emit(new Values(data)); + // always start with the data packet + Values values = new Values(data); + + // add additional values based on the specified attribute names + for (String attributeName : attributeNames) { + if (data.getAttributes().containsKey(attributeName)) { + values.add(data.getAttributes().get(attributeName)); + } + } + + spoutOutputCollector.emit(values); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("nifiDataPacket")); + final List fieldNames = new ArrayList<>(); + fieldNames.add(NIFI_DATA_PACKET); + fieldNames.addAll(attributeNames); + outputFieldsDeclarer.declare(new Fields(fieldNames)); } @Override