NIFI-750 adding a way to specify attribute names when constructing the spout

This commit is contained in:
Bryan Bende 2015-07-04 12:38:33 -04:00
parent 208402472d
commit 0d2842e4f9
2 changed files with 37 additions and 3 deletions

View File

@ -27,7 +27,7 @@
<dependency> <dependency>
<groupId>org.apache.storm</groupId> <groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId> <artifactId>storm-core</artifactId>
<version>0.9.4</version> <version>0.9.5</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -112,14 +112,35 @@ public class NiFiSpout extends BaseRichSpout {
public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class); public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
public static final String NIFI_DATA_PACKET = "nifiDataPacket";
private NiFiSpoutReceiver spoutReceiver; private NiFiSpoutReceiver spoutReceiver;
private LinkedBlockingQueue<NiFiDataPacket> queue; private LinkedBlockingQueue<NiFiDataPacket> queue;
private SpoutOutputCollector spoutOutputCollector; private SpoutOutputCollector spoutOutputCollector;
private final SiteToSiteClientConfig clientConfig; private final SiteToSiteClientConfig clientConfig;
private final List<String> attributeNames;
/**
* @param clientConfig
* configuration used to build the SiteToSiteClient
*/
public NiFiSpout(SiteToSiteClientConfig clientConfig) { public NiFiSpout(SiteToSiteClientConfig clientConfig) {
this(clientConfig, null);
}
/**
*
* @param clientConfig
* configuration used to build the SiteToSiteClient
* @param attributeNames
* names of FlowFile attributes to be added as values to each tuple, in addition
* to the nifiDataPacket value on all tuples
*
*/
public NiFiSpout(SiteToSiteClientConfig clientConfig, List<String> attributeNames) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
this.attributeNames = (attributeNames == null ? new ArrayList<String>() : attributeNames);
} }
@Override @Override
@ -139,13 +160,26 @@ public class NiFiSpout extends BaseRichSpout {
if (data == null) { if (data == null) {
Utils.sleep(50); Utils.sleep(50);
} else { } else {
spoutOutputCollector.emit(new Values(data)); // always start with the data packet
Values values = new Values(data);
// add additional values based on the specified attribute names
for (String attributeName : attributeNames) {
if (data.getAttributes().containsKey(attributeName)) {
values.add(data.getAttributes().get(attributeName));
}
}
spoutOutputCollector.emit(values);
} }
} }
@Override @Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("nifiDataPacket")); final List<String> fieldNames = new ArrayList<>();
fieldNames.add(NIFI_DATA_PACKET);
fieldNames.addAll(attributeNames);
outputFieldsDeclarer.declare(new Fields(fieldNames));
} }
@Override @Override