diff --git a/nifi/nifi-external/nifi-storm-spout/pom.xml b/nifi/nifi-external/nifi-storm-spout/pom.xml
index c55c698a43..353dd2d55f 100644
--- a/nifi/nifi-external/nifi-storm-spout/pom.xml
+++ b/nifi/nifi-external/nifi-storm-spout/pom.xml
@@ -27,7 +27,7 @@
org.apache.storm
storm-core
- 0.9.4
+ 0.9.5
provided
diff --git a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
index 50631231c2..64dac6f0a2 100644
--- a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
+++ b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
@@ -112,14 +112,35 @@ public class NiFiSpout extends BaseRichSpout {
public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
+ public static final String NIFI_DATA_PACKET = "nifiDataPacket";
+
private NiFiSpoutReceiver spoutReceiver;
private LinkedBlockingQueue queue;
private SpoutOutputCollector spoutOutputCollector;
private final SiteToSiteClientConfig clientConfig;
+ private final List attributeNames;
+ /**
+ * @param clientConfig
+ * configuration used to build the SiteToSiteClient
+ */
public NiFiSpout(SiteToSiteClientConfig clientConfig) {
+ this(clientConfig, null);
+ }
+
+ /**
+ *
+ * @param clientConfig
+ * configuration used to build the SiteToSiteClient
+ * @param attributeNames
+ * names of FlowFile attributes to be added as values to each tuple, in addition
+ * to the nifiDataPacket value on all tuples
+ *
+ */
+ public NiFiSpout(SiteToSiteClientConfig clientConfig, List attributeNames) {
this.clientConfig = clientConfig;
+ this.attributeNames = (attributeNames == null ? new ArrayList() : attributeNames);
}
@Override
@@ -139,13 +160,26 @@ public class NiFiSpout extends BaseRichSpout {
if (data == null) {
Utils.sleep(50);
} else {
- spoutOutputCollector.emit(new Values(data));
+ // always start with the data packet
+ Values values = new Values(data);
+
+ // add additional values based on the specified attribute names
+ for (String attributeName : attributeNames) {
+ if (data.getAttributes().containsKey(attributeName)) {
+ values.add(data.getAttributes().get(attributeName));
+ }
+ }
+
+ spoutOutputCollector.emit(values);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("nifiDataPacket"));
+ final List fieldNames = new ArrayList<>();
+ fieldNames.add(NIFI_DATA_PACKET);
+ fieldNames.addAll(attributeNames);
+ outputFieldsDeclarer.declare(new Fields(fieldNames));
}
@Override