This commit is contained in:
Mark Payne 2015-06-05 15:06:35 -04:00
commit 63bdda4b5b
33 changed files with 729 additions and 192 deletions

View File

@ -243,11 +243,11 @@ language governing permissions and limitations under the License. -->
<nifi.content.claim.max.appendable.size>10 MB</nifi.content.claim.max.appendable.size>
<nifi.content.claim.max.flow.files>100</nifi.content.claim.max.flow.files>
<nifi.content.repository.directory.default>./content_repository</nifi.content.repository.directory.default>
<nifi.content.repository.archive.max.retention.period />
<nifi.content.repository.archive.max.usage.percentage />
<nifi.content.repository.archive.enabled>false</nifi.content.repository.archive.enabled>
<nifi.content.repository.archive.max.retention.period>12 hours</nifi.content.repository.archive.max.retention.period>
<nifi.content.repository.archive.max.usage.percentage>50%</nifi.content.repository.archive.max.usage.percentage>
<nifi.content.repository.archive.enabled>true</nifi.content.repository.archive.enabled>
<nifi.content.repository.always.sync>false</nifi.content.repository.always.sync>
<nifi.content.viewer.url />
<nifi.content.viewer.url>/nifi-content-viewer/</nifi.content.viewer.url>
<nifi.restore.directory />
<nifi.ui.banner.text />
@ -267,11 +267,11 @@ language governing permissions and limitations under the License. -->
<nifi.provenance.repository.directory.default>./provenance_repository</nifi.provenance.repository.directory.default>
<nifi.provenance.repository.max.storage.time>24 hours</nifi.provenance.repository.max.storage.time>
<nifi.provenance.repository.max.storage.size>1 GB</nifi.provenance.repository.max.storage.size>
<nifi.provenance.repository.rollover.time>5 mins</nifi.provenance.repository.rollover.time>
<nifi.provenance.repository.rollover.time>30 secs</nifi.provenance.repository.rollover.time>
<nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size>
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
<nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID</nifi.provenance.repository.indexed.fields>
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields>
<nifi.provenance.repository.indexed.attributes />
<nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size>
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
@ -282,8 +282,8 @@ language governing permissions and limitations under the License. -->
<!-- Component status repository properties -->
<nifi.components.status.repository.implementation>org.apache.nifi.controller.status.history.VolatileComponentStatusRepository</nifi.components.status.repository.implementation>
<nifi.components.status.repository.buffer.size>288</nifi.components.status.repository.buffer.size>
<nifi.components.status.snapshot.frequency>5 mins</nifi.components.status.snapshot.frequency>
<nifi.components.status.repository.buffer.size>1440</nifi.components.status.repository.buffer.size>
<nifi.components.status.snapshot.frequency>1 min</nifi.components.status.snapshot.frequency>
<!-- nifi.properties: web properties -->
<nifi.web.war.directory>./lib</nifi.web.war.directory>

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-external</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-storm-spout</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import java.util.Map;
/**
* <p>
* The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
* a FlowFile's content and its attributes so that they can be processed by
* Storm
* </p>
*/
public interface NiFiDataPacket {
/**
* @return the contents of a NiFi FlowFile
*/
byte[] getContent();
/**
* @return a Map of attributes that are associated with the NiFi FlowFile
*/
Map<String, String> getAttributes();
}

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
/**
* <p>
* The <code>NiFiSpout</code> provides a way to pull data from Apache NiFi so
* that it can be processed by Apache Storm. The NiFi Spout connects to a NiFi
* instance provided in the config and requests data from the OutputPort that
* is named. In NiFi, when an OutputPort is added to the root process group,
* it acts as a queue of data for remote clients. This spout is then able to
* pull that data from NiFi reliably.
* </p>
*
* <p>
* It is important to note that if pulling data from a NiFi cluster, the URL
* that should be used is that of the NiFi Cluster Manager. The Receiver will
* automatically handle determining the nodes in that cluster and pull from
* those nodes as appropriate.
* </p>
*
* <p>
* In order to use the NiFiSpout, you will need to first build a
* {@link SiteToSiteClientConfig} to provide to the constructor. This can be
* achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
* snippet of driver code to pull data from NiFi that is running on
* localhost:8080. This example assumes that NiFi exposes an OutputPort on the
* root group named "Data For Storm". Additionally, it assumes that the data
* that it will receive from this OutputPort is text data, as it will map the
* byte array received from NiFi to a UTF-8 Encoded string.
* </p>
*
* <code>
* <pre>
* {@code
*
* // Build a Site-To-Site client config
* SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
* .url("http://localhost:8080/nifi")
* .portName("Data for Storm")
* .buildConfig();
*
* // Build a topology starting with a NiFiSpout
* TopologyBuilder builder = new TopologyBuilder();
* builder.setSpout("nifi", new NiFiSpout(clientConfig));
*
* // Add a bolt that prints the attributes and content
* builder.setBolt("print", new BaseBasicBolt() {
* @Override
* public void execute(Tuple tuple, BasicOutputCollector collector) {
* NiFiDataPacket dp = (NiFiDataPacket) tuple.getValueByField("nifiDataPacket");
* System.out.println("Attributes: " + dp.getAttributes());
* System.out.println("Content: " + new String(dp.getContent()));
* }
*
* @Override
* public void declareOutputFields(OutputFieldsDeclarer declarer) {}
*
* }).shuffleGrouping("nifi");
*
* // Submit the topology running in local mode
* Config conf = new Config();
* LocalCluster cluster = new LocalCluster();
* cluster.submitTopology("test", conf, builder.createTopology());
*
* Utils.sleep(90000);
* cluster.shutdown();
* }
* </pre>
* </code>
*/
public class NiFiSpout extends BaseRichSpout {
private static final long serialVersionUID = 3067274587595578836L;
public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
private NiFiSpoutReceiver spoutReceiver;
private LinkedBlockingQueue<NiFiDataPacket> queue;
private SpoutOutputCollector spoutOutputCollector;
private final SiteToSiteClientConfig clientConfig;
public NiFiSpout(SiteToSiteClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
this.queue = new LinkedBlockingQueue<>(1000);
this.spoutReceiver = new NiFiSpoutReceiver();
this.spoutReceiver.setDaemon(true);
this.spoutReceiver.setName("NiFi Spout Receiver");
this.spoutReceiver.start();
}
@Override
public void nextTuple() {
NiFiDataPacket data = queue.poll();
if (data == null) {
Utils.sleep(50);
} else {
spoutOutputCollector.emit(new Values(data));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("nifiDataPacket"));
}
@Override
public void close() {
super.close();
spoutReceiver.shutdown();
}
class NiFiSpoutReceiver extends Thread {
private boolean shutdown = false;
public synchronized void shutdown() {
this.shutdown = true;
}
@Override
public void run() {
try {
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
try {
while (!shutdown) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
DataPacket dataPacket = transaction.receive();
if (dataPacket == null) {
transaction.confirm();
transaction.complete();
// no data available. Wait a bit and try again
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
continue;
}
final List<NiFiDataPacket> dataPackets = new ArrayList<>();
do {
// Read the data into a byte array and wrap it along with the attributes
// into a NiFiDataPacket.
final InputStream inStream = dataPacket.getData();
final byte[] data = new byte[(int) dataPacket.getSize()];
StreamUtils.fillBuffer(inStream, data);
final Map<String, String> attributes = dataPacket.getAttributes();
final NiFiDataPacket niFiDataPacket = new NiFiDataPacket() {
@Override
public byte[] getContent() {
return data;
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
};
dataPackets.add(niFiDataPacket);
dataPacket = transaction.receive();
} while (dataPacket != null);
// Confirm transaction to verify the data
transaction.confirm();
for (NiFiDataPacket dp : dataPackets) {
queue.offer(dp);
}
transaction.complete();
}
} finally {
try {
client.close();
} catch (final IOException ioe) {
LOGGER.error("Failed to close client", ioe);
}
}
} catch (final IOException ioe) {
LOGGER.error("Failed to receive data from NiFi", ioe);
}
}
}
}

View File

@ -25,5 +25,6 @@
<packaging>pom</packaging>
<modules>
<module>nifi-spark-receiver</module>
<module>nifi-storm-spout</module>
</modules>
</project>

View File

@ -35,7 +35,9 @@ public class XmlValidator {
public static void assertXmlValid(String xml) {
try {
DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(new InputSource(new StringReader(xml)));
final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
dbf.setNamespaceAware(true);
dbf.newDocumentBuilder().parse(new InputSource(new StringReader(xml)));
} catch (SAXException | IOException | ParserConfigurationException e) {
Assert.fail(e.getMessage());
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.cluster.protocol;
import org.apache.nifi.cluster.protocol.DataFlow;
import java.io.Serializable;
import java.util.Arrays;
@ -41,12 +40,15 @@ public class StandardDataFlow implements Serializable, DataFlow {
* Constructs an instance.
*
* @param flow a valid flow as bytes, which cannot be null
* @param templateBytes an XML representation of templates
* @param snippetBytes an XML representation of snippets
* @param templateBytes an XML representation of templates. May be null.
* @param snippetBytes an XML representation of snippets. May be null.
*
* @throws NullPointerException if any argument is null
* @throws NullPointerException if flow is null
*/
public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) {
if(flow == null){
throw new NullPointerException("Flow cannot be null");
}
this.flow = flow;
this.templateBytes = templateBytes;
this.snippetBytes = snippetBytes;
@ -63,31 +65,22 @@ public class StandardDataFlow implements Serializable, DataFlow {
return bytes == null ? null : Arrays.copyOf(bytes, bytes.length);
}
/**
* @return the raw byte array of the flow
*/
@Override
public byte[] getFlow() {
return flow;
}
/**
* @return the raw byte array of the templates
*/
@Override
public byte[] getTemplates() {
return templateBytes;
}
/**
* @return the raw byte array of the snippets
*/
@Override
public byte[] getSnippets() {
return snippetBytes;
}
/**
* @return true if processors should be automatically started at application
* startup; false otherwise
*/
@Override
public boolean isAutoStartProcessors() {
return autoStartProcessors;
}

View File

@ -406,7 +406,9 @@ public class DataFlowDaoImpl implements DataFlowDao {
private byte[] getEmptyFlowBytes() throws IOException {
try {
final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
docBuilderFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
final Element controller = document.createElement("flowController");

View File

@ -1085,8 +1085,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder builder = docFactory.newDocumentBuilder();
docFactory.setNamespaceAware(true);
final DocumentBuilder builder = docFactory.newDocumentBuilder();
builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@Override
public void fatalError(final SAXParseException err) throws SAXException {
@ -1483,6 +1484,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
final Element rootElement = document.createElement("controllerServices");
@ -1497,6 +1500,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
final Element rootElement = document.createElement("reportingTasks");

View File

@ -131,7 +131,10 @@ public class DataFlowManagementServiceImplTest {
private void verifyFlow() throws ParserConfigurationException, SAXException, IOException {
final byte[] flowBytes = service.loadDataFlow().getDataFlow().getFlow();
final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document doc = docBuilder.parse(new ByteArrayInputStream(flowBytes));
final Element controller = (Element) doc.getElementsByTagName("flowController").item(0);
final Element rootGroup = (Element) controller.getElementsByTagName("rootGroup").item(0);

View File

@ -54,8 +54,9 @@ public class FlowUnmarshaller {
}
final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
final DocumentBuilder docBuilder = dbf.newDocumentBuilder();
dbf.setNamespaceAware(true);
final DocumentBuilder docBuilder = dbf.newDocumentBuilder();
final Document document = docBuilder.parse(new ByteArrayInputStream(flowContents));
final FlowSnippetDTO flowDto = new FlowSnippetDTO();

View File

@ -71,6 +71,8 @@ public class StandardFlowSerializer implements FlowSerializer {
try {
// create a new, empty document
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document doc = docBuilder.newDocument();

View File

@ -345,7 +345,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
final Schema schema = schemaFactory.newSchema(FLOW_XSD_RESOURCE);
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
docFactory.setSchema(schema);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
// parse flow

View File

@ -276,7 +276,7 @@ public class ProvenanceResource extends ApplicationResource {
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Produces(MediaType.WILDCARD)
@Path("/events/{id}/content/input")
@PreAuthorize("hasRole('ROLE_PROVENANCE')")
@ApiOperation(
@ -375,7 +375,7 @@ public class ProvenanceResource extends ApplicationResource {
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Produces(MediaType.WILDCARD)
@Path("/events/{id}/content/output")
@PreAuthorize("hasRole('ROLE_PROVENANCE')")
@ApiOperation(

View File

@ -132,9 +132,13 @@ public class ContentViewerController extends HttpServlet {
}
// buffer the content to support reseting in case we need to detect the content type or char encoding
final BufferedInputStream bis = new BufferedInputStream(downloadableContent.getContent());
try (final BufferedInputStream bis = new BufferedInputStream(downloadableContent.getContent());) {
final String mimeType;
// detect the content type
// when standalone and we don't know the type is null as we were able to directly access the content bypassing the rest endpoint,
// when clustered and we don't know the type set to octet stream since the content was retrieved from the node's rest endpoint
if (downloadableContent.getType() == null || downloadableContent.getType().equals(MediaType.OCTET_STREAM.toString())) {
// attempt to detect the content stream if we don't know what it is ()
final DefaultDetector detector = new DefaultDetector();
// create the stream for tika to process, buffered to support reseting
@ -146,7 +150,10 @@ public class ContentViewerController extends HttpServlet {
// Get mime type
final MediaType mediatype = detector.detect(tikaStream, metadata);
final String mimeType = mediatype.toString();
mimeType = mediatype.toString();
} else {
mimeType = downloadableContent.getType();
}
// add attributes needed for the header
request.setAttribute("filename", downloadableContent.getFilename());
@ -257,6 +264,7 @@ public class ContentViewerController extends HttpServlet {
// generate footer
request.getRequestDispatcher("/WEB-INF/jsp/footer.jsp").include(request, response);
}
}
/**
* @param request request

View File

@ -16,33 +16,62 @@
*/
package org.apache.nifi.processors.hl7;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.nifi.processors.hl7.ExtractHL7Attributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestExtractHL7Attributes {
@BeforeClass
public static void setup() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG");
}
@Test
public void testExtract() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG");
final TestRunner runner = TestRunners.newTestRunner(ExtractHL7Attributes.class);
runner.enqueue(Paths.get("src/test/resources/hypoglycemia.hl7"));
runner.run();
runner.assertAllFlowFilesTransferred(ExtractHL7Attributes.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ExtractHL7Attributes.REL_SUCCESS).get(0);
final SortedMap<String, String> sortedAttrs = new TreeMap<>(out.getAttributes());
int mshSegmentCount = 0;
int obrSegmentCount = 0;
int obxSegmentCount = 0;
int pidSegmentCount = 0;
for (final Map.Entry<String, String> entry : sortedAttrs.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
final String entryKey = entry.getKey();
if (entryKey.startsWith("MSH")) {
mshSegmentCount++;
continue;
} else if (entryKey.startsWith("OBR")) {
obrSegmentCount++;
continue;
} else if (entryKey.startsWith("OBX")) {
obxSegmentCount++;
continue;
} else if (entryKey.startsWith("PID")) {
pidSegmentCount++;
continue;
}
}
Assert.assertEquals("Did not have the proper number of MSH segments", 8, mshSegmentCount);
Assert.assertEquals("Did not have the proper number of OBR segments", 9, obrSegmentCount);
Assert.assertEquals("Did not have the proper number of OBX segments", 9, obxSegmentCount);
Assert.assertEquals("Did not have the proper number of PID segments", 6, pidSegmentCount);
}
}

View File

@ -106,8 +106,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
};
/**
* Resolves a {@link Schema} for the given string, either a URI or a JSON
* literal.
* Resolves a {@link Schema} for the given string, either a URI or a JSON literal.
*/
protected static Schema getSchema(String uriOrLiteral, Configuration conf) {
URI uri;

View File

@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
@ -97,22 +98,22 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session)
public void onTrigger(final ProcessContext context, final ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
FlowFile successfulRecords = session.get();
if (successfulRecords == null) {
return;
}
String schemaProperty = context.getProperty(SCHEMA)
.evaluateAttributeExpressions(flowFile)
.evaluateAttributeExpressions(successfulRecords)
.getValue();
final Schema schema;
try {
schema = getSchema(schemaProperty, DefaultConfiguration.get());
} catch (SchemaNotFoundException e) {
getLogger().error("Cannot find schema: " + schemaProperty);
session.transfer(flowFile, FAILURE);
session.transfer(successfulRecords, FAILURE);
return;
}
@ -121,21 +122,31 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
writer.setCodec(CodecFactory.snappyCodec());
try {
flowFile = session.write(flowFile, new StreamCallback() {
successfulRecords = session.write(successfulRecords, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
FlowFile failedRecords = session.create();
long written = 0L;
long errors = 0L;
long total = 0L;
try (JSONFileReader<Record> reader = new JSONFileReader<>(
in, schema, Record.class)) {
reader.initialize();
try (DataFileWriter<Record> w = writer.create(schema, out)) {
while (reader.hasNext()) {
total += 1;
try {
Record record = reader.next();
w.append(record);
written += 1;
} catch (DatasetRecordException e) {
} catch (final DatasetRecordException e) {
failedRecords = session.append(failedRecords, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write((e.getMessage() + " ["
+ e.getCause().getMessage() + "]\n").getBytes());
}
});
errors += 1;
}
}
@ -144,20 +155,25 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
false /* update only if file transfer is successful */);
session.adjustCounter("Conversion errors", errors,
false /* update only if file transfer is successful */);
if (errors > 0L) {
getLogger().warn("Failed to convert " + errors + '/' + total + " records from JSON to Avro");
}
}
session.transfer(failedRecords, FAILURE);
}
});
session.transfer(flowFile, SUCCESS);
session.transfer(successfulRecords, SUCCESS);
//session.getProvenanceReporter().send(flowFile, target.getUri().toString());
} catch (ProcessException | DatasetIOException e) {
getLogger().error("Failed reading or writing", e);
session.transfer(flowFile, FAILURE);
session.transfer(successfulRecords, FAILURE);
} catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(flowFile, FAILURE);
session.transfer(successfulRecords, FAILURE);
}
}

View File

@ -19,8 +19,10 @@
package org.apache.nifi.processors.kite;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
@ -38,9 +40,13 @@ public class TestJSONToAvroProcessor {
public static final String JSON_CONTENT = ""
+ "{\"id\": 1,\"color\": \"green\"}"
+ "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string
+ "{\"id\": \"120V\", \"color\": \"blue\"}\n" // invalid, ID is a string
+ "{\"id\": 10, \"color\": 15.23}\n" + // invalid, color as double
"{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
public static final String FAILURE_CONTENT = "Cannot convert field id [Cannot convert to long: \"120V\"]\n"
+ "Cannot convert field color [Cannot convert to string: 15.23]\n";
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
@ -54,8 +60,13 @@ public class TestJSONToAvroProcessor {
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
Assert.assertEquals("Should reject 2 rows", 2, errors);
runner.assertAllFlowFilesTransferred("success", 1);
runner.assertTransferCount("success", 1);
runner.assertTransferCount("failure", 1);
String failureContent = Bytes.toString(runner.getContentAsByteArray(
runner.getFlowFilesForRelationship("failure").get(0)));
Assert.assertEquals("Should reject an invalid string and double", FAILURE_CONTENT, failureContent);
}
}

View File

@ -248,6 +248,7 @@
<exclude>src/test/resources/CompressedData/SampleFileConcat.txt.bz2</exclude>
<exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude>
<exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude>
<exclude>src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar</exclude>
<exclude>src/test/resources/TestIdentifyMimeType/1.jar</exclude>
<exclude>src/test/resources/TestIdentifyMimeType/1.tar</exclude>
<exclude>src/test/resources/TestIdentifyMimeType/1.tar.gz</exclude>

View File

@ -49,7 +49,7 @@ import org.apache.nifi.util.StopWatch;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"experimental", "encode", "base64"})
@Tags({"encode", "base64"})
@CapabilityDescription("Encodes or decodes content to and from base64")
public class Base64EncodeContent extends AbstractProcessor {

View File

@ -171,17 +171,17 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
int binsAdded = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[]{binsAdded});
final int flowFilesBinned = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned});
if (!isScheduled()) {
return;
}
binsAdded += migrateBins(context);
final int binsMigrated = migrateBins(context);
final int binsProcessed = processBins(context, sessionFactory);
if (binsProcessed == 0 && binsAdded == 0) {
//If we accomplished nothing then let's yield
if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
context.yield();
}
}
@ -203,7 +203,6 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
this.readyBins.add(bin);
}
}
return added;
}
@ -251,16 +250,16 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
int binsAdded = 0;
while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
int flowFilesBinned = 0;
while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
if (!isScheduled()) {
return binsAdded;
break;
}
final ProcessSession session = sessionFactory.createSession();
FlowFile flowFile = session.get();
if (flowFile == null) {
return binsAdded;
break;
}
flowFile = this.preprocessFlowFile(context, session, flowFile);
@ -276,10 +275,10 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
this.readyBins.add(bin);
}
binsAdded++;
flowFilesBinned++;
}
return binsAdded;
return flowFilesBinned;
}
@OnScheduled

View File

@ -51,7 +51,7 @@ import org.apache.nifi.processor.util.StandardValidators;
@EventDriven
@SupportsBatching
@Tags({"experimental", "hash", "dupe", "duplicate", "dedupe"})
@Tags({"hash", "dupe", "duplicate", "dedupe"})
@CapabilityDescription("Caches a value, computed from FlowFile attributes, for each incoming FlowFile and determines if the cached value has already been seen. "
+ "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's"
+ "\"description\", which is specified in the <FlowFile Description> property. If the FlowFile is not determined to be a duplicate, the Processor "

View File

@ -55,7 +55,7 @@ import org.apache.nifi.util.StopWatch;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"experimental", "encode", "decode", "base64", "hex"})
@Tags({"encode", "decode", "base64", "hex"})
@CapabilityDescription("Encodes the FlowFile content in base64")
public class EncodeContent extends AbstractProcessor {

View File

@ -81,7 +81,7 @@ import org.xml.sax.InputSource;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"XML", "evaluate", "XPath", "XQuery", "experimental"})
@Tags({"XML", "evaluate", "XPath", "XQuery"})
@CapabilityDescription(
"Evaluates one or more XQueries against the content of a FlowFile. The results of those XQueries are assigned "
+ "to FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of "

View File

@ -33,6 +33,7 @@ import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -88,7 +89,13 @@ import org.apache.nifi.stream.io.StreamUtils;
* <li>Supports expression language: true</li>
* </ul>
* </li>
*
* <li>Ignore STDIN
* <ul>
* <li>Indicates whether or not the flowfile's contents should be streamed as part of STDIN</li>
* <li>Default value: false (this means that the contents of a flowfile will be sent as STDIN to your command</li>
* <li>Supports expression language: false</li>
* </ul>
* </li>
* </ul>
*
* <p>
@ -113,6 +120,7 @@ import org.apache.nifi.stream.io.StreamUtils;
@SupportsBatching
@Tags({"command execution", "command", "stream", "execute"})
@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
@DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor")
@WritesAttributes({
@WritesAttribute(attribute = "execution.command", description = "The name of the command executed to create the new FlowFile"),
@WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"),
@ -175,12 +183,22 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.required(false)
.build();
static final PropertyDescriptor IGNORE_STDIN = new PropertyDescriptor.Builder()
.name("Ignore STDIN")
.description("If true, the contents of the incoming flowfile will not be passed to the executing command")
.addValidator(Validator.VALID)
.allowableValues("true", "false")
.defaultValue("false")
.build();
private static final List<PropertyDescriptor> PROPERTIES;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(EXECUTION_ARGUMENTS);
props.add(EXECUTION_COMMAND);
props.add(IGNORE_STDIN);
props.add(WORKING_DIR);
PROPERTIES = Collections.unmodifiableList(props);
}
@ -202,6 +220,16 @@ public class ExecuteStreamCommand extends AbstractProcessor {
return PROPERTIES;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
@ -213,6 +241,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
args.add(executeCommand);
final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
if (!StringUtils.isBlank(commandArguments)) {
for (String arg : commandArguments.split(";")) {
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
@ -230,6 +259,13 @@ public class ExecuteStreamCommand extends AbstractProcessor {
logger.warn("Failed to create working directory {}, using current working directory {}", new Object[]{workingDir, System.getProperty("user.dir")});
}
}
final Map<String, String> environment = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
environment.put(entry.getKey().getName(), entry.getValue());
}
}
builder.environment().putAll(environment);
builder.command(args);
builder.directory(dir);
builder.redirectInput(Redirect.PIPE);
@ -250,7 +286,11 @@ public class ExecuteStreamCommand extends AbstractProcessor {
final BufferedOutputStream bos = new BufferedOutputStream(pos);
FlowFile outputStreamFlowFile = session.create(flowFile);
StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process);
if (ignoreStdin) {
session.read(outputStreamFlowFile, callback);
} else {
session.read(flowFile, callback);
}
outputStreamFlowFile = callback.outputStreamFlowFile;
exitCode = callback.exitCode;
logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});

View File

@ -261,7 +261,7 @@ public class ScanContent extends AbstractProcessor {
@Override
public SearchTerm<byte[]> nextTerm() throws IOException {
final String nextLine = reader.readLine();
if (nextLine == null) {
if (nextLine == null || nextLine.isEmpty()) {
return null;
}
return new SearchTerm<>(nextLine.getBytes("UTF-8"));

View File

@ -193,8 +193,6 @@ public class SplitJson extends AbstractJsonPathProcessor {
segments.add(split);
}
processSession.getProvenanceReporter().fork(original, segments);
processSession.transfer(segments, REL_SPLIT);
processSession.transfer(original, REL_ORIGINAL);
logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()});

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Map;
public class TestDynamicEnvironment {
public static void main(String[] args) {
// iterate through current environment and print out all properties starting with NIFI
for (Map.Entry<String, String> env: System.getenv().entrySet()) {
if (env.getKey().startsWith("NIFI")) {
System.out.println(env.getKey() + "=" + env.getValue());
}
}
}
}

View File

@ -157,6 +157,29 @@ public class TestExecuteStreamCommand {
|| result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\n"));
}
@Test
public void testIgnoredStdin() throws IOException {
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
String jarPath = exJar.getAbsolutePath();
exJar.setExecutable(true);
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
controller.setValidateExpressionUsage(false);
controller.enqueue(dummy.toPath());
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
controller.run(1);
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
byte[] byteArray = flowFiles.get(0).toByteArray();
String result = new String(byteArray);
assertTrue("TestIngestAndUpdate.jar should not have received anything to modify",
result.endsWith("target:ModifiedResult\n"));
}
// this is dependent on window with cygwin...so it's not enabled
@Ignore
@Test
@ -182,4 +205,30 @@ public class TestExecuteStreamCommand {
assertEquals(0, flowFiles.get(0).getSize());
}
@Test
public void testDynamicEnvironment() throws Exception {
File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
String jarPath = exJar.getAbsolutePath();
exJar.setExecutable(true);
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
controller.setProperty("NIFI_TEST_1", "testvalue1");
controller.setProperty("NIFI_TEST_2", "testvalue2");
controller.setValidateExpressionUsage(false);
controller.enqueue(dummy.toPath());
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
controller.run(1);
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
byte[] byteArray = flowFiles.get(0).toByteArray();
String result = new String(byteArray);
String[] dynamicEnvironment = result.split("\n");
assertEquals("Should contain two environment variables starting with NIFI", 2, dynamicEnvironment.length);
assertEquals("NIFI_TEST_2 environment variable is missing", "NIFI_TEST_2=testvalue2", dynamicEnvironment[0]);
assertEquals("NIFI_TEST_1 environment variable is missing", "NIFI_TEST_1=testvalue1", dynamicEnvironment[1]);
}
}

View File

@ -74,6 +74,26 @@ public class TestMergeContent {
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testSimpleBinaryConcatSingleBin() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.MAX_BIN_COUNT, "1");
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testSimpleBinaryConcatWithTextDelimiters() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());

View File

@ -34,6 +34,24 @@ import org.junit.Test;
public class TestScanContent {
@Test
public void testBlankLineInDictionaryTextEncoding() throws IOException {
final String dictionaryWithBlankLine = "Line1\n\nLine3";
final byte[] dictionaryBytes = dictionaryWithBlankLine.getBytes(ScanContent.UTF8);
final Path dictionaryPath = Paths.get("target/dictionary");
Files.write(dictionaryPath, dictionaryBytes, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
final TestRunner runner = TestRunners.newTestRunner(new ScanContent());
runner.setThreadCount(1);
runner.setProperty(ScanContent.DICTIONARY, dictionaryPath.toString());
runner.setProperty(ScanContent.DICTIONARY_ENCODING, ScanContent.TEXT_ENCODING);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(ScanContent.REL_NO_MATCH, 1);
}
@Ignore("This test has a race condition/ordering problem")
@Test
public void testBinaryScan() throws IOException {