Merge branch 'NIFI-333' of https://github.com/danbress/incubator-nifi into develop

This commit is contained in:
Mark Payne 2015-02-23 15:34:13 -05:00
commit d8954ab013
9 changed files with 148 additions and 87 deletions

View File

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -33,7 +34,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
import org.apache.hadoop.io.SequenceFile.CompressionType;
/**
* <p>
@ -167,7 +167,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType);
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
} catch (Exception e) {
} catch (ProcessException e) {
getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", new Object[]{flowFile}, e);
session.transfer(flowFile, RELATIONSHIP_FAILURE);
}

View File

@ -28,6 +28,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.binary.Base64InputStream;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
@ -36,11 +41,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
import org.apache.nifi.util.StopWatch;
@ -136,7 +137,7 @@ public class Base64EncodeContent extends AbstractProcessor {
logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
} catch (ProcessException e) {
logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -32,30 +32,31 @@ import java.util.concurrent.TimeUnit;
import lzma.sdk.lzma.Decoder;
import lzma.streams.LzmaInputStream;
import lzma.streams.LzmaOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.tukaani.xz.LZMA2Options;
import org.tukaani.xz.XZInputStream;
import org.tukaani.xz.XZOutputStream;
@ -290,7 +291,7 @@ public class CompressContent extends AbstractProcessor {
compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
} catch (final ProcessException e) {
logger.error("Unable to {} {} using {} compression format due to {}; routing to failure", new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, e});
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -28,23 +28,23 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ObjectHolder;
@EventDriven
@ -143,7 +143,7 @@ public class HashContent extends AbstractProcessor {
logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()});
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
} catch (final ProcessException e) {
logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -32,6 +32,7 @@ import java.util.Set;
import javax.activation.DataHandler;
import javax.mail.Message;
import javax.mail.Message.RecipientType;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.URLName;
import javax.mail.internet.AddressException;
@ -56,9 +57,9 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.codec.binary.Base64;
import com.sun.mail.smtp.SMTPTransport;
@ -263,7 +264,7 @@ public class PutEmail extends AbstractProcessor {
session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
session.transfer(flowFile, REL_SUCCESS);
logger.info("Sent email as a result of receiving {}", new Object[]{flowFile});
} catch (final Exception e) {
} catch (final ProcessException | MessagingException | IOException e) {
context.yield();
logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);

View File

@ -25,6 +25,11 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -34,12 +39,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@EventDriven
@ -102,62 +101,57 @@ public class SegmentContent extends AbstractProcessor {
return;
}
try {
final String segmentId = UUID.randomUUID().toString();
final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
final String segmentId = UUID.randomUUID().toString();
final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
if (flowFile.getSize() <= segmentSize) {
flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
if (flowFile.getSize() <= segmentSize) {
flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
FlowFile clone = session.clone(flowFile);
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(clone, REL_SEGMENTS);
return;
}
int totalSegments = (int) (flowFile.getSize() / segmentSize);
if (totalSegments * segmentSize < flowFile.getSize()) {
totalSegments++;
}
final Map<String, String> segmentAttributes = new HashMap<>();
segmentAttributes.put(SEGMENT_ID, segmentId);
segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
segmentAttributes.put(FRAGMENT_ID, segmentId);
segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments));
final Set<FlowFile> segmentSet = new HashSet<>();
for (int i = 1; i <= totalSegments; i++) {
final long segmentOffset = segmentSize * (i - 1);
FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset));
segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
segment = session.putAllAttributes(segment, segmentAttributes);
segmentSet.add(segment);
}
session.transfer(segmentSet, REL_SEGMENTS);
FlowFile clone = session.clone(flowFile);
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(clone, REL_SEGMENTS);
return;
}
if (totalSegments <= 10) {
getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet});
} else {
getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments});
}
} catch (final Exception e) {
throw new ProcessException(e);
int totalSegments = (int) (flowFile.getSize() / segmentSize);
if (totalSegments * segmentSize < flowFile.getSize()) {
totalSegments++;
}
final Map<String, String> segmentAttributes = new HashMap<>();
segmentAttributes.put(SEGMENT_ID, segmentId);
segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
segmentAttributes.put(FRAGMENT_ID, segmentId);
segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments));
final Set<FlowFile> segmentSet = new HashSet<>();
for (int i = 1; i <= totalSegments; i++) {
final long segmentOffset = segmentSize * (i - 1);
FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset));
segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
segment = session.putAllAttributes(segment, segmentAttributes);
segmentSet.add(segment);
}
session.transfer(segmentSet, REL_SEGMENTS);
session.transfer(flowFile, REL_ORIGINAL);
if (totalSegments <= 10) {
getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet});
} else {
getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments});
}
}
}

View File

@ -51,6 +51,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@ -152,7 +153,7 @@ public class TransformXml extends AbstractProcessor {
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transformed {}", new Object[]{original});
} catch (Exception e) {
} catch (ProcessException e) {
logger.error("Unable to transform {} due to {}", new Object[]{original, e});
session.transfer(original, REL_FAILURE);
}

View File

@ -109,4 +109,22 @@ public class TestCompressContent {
flowFile.assertAttributeEquals("filename", "SampleFile.txt.gz");
}
@Test
public void testDecompressFailure() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
runner.setProperty(CompressContent.MODE, "decompress");
runner.setProperty(CompressContent.COMPRESSION_FORMAT, "gzip");
byte[] data = new byte[]{1,2,3,4,5,6,7,8,9,10};
runner.enqueue(data);
assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid());
runner.run();
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(CompressContent.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(CompressContent.REL_FAILURE).get(0).assertContentEquals(data);
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestPutEmail {
@Test
public void testHotNotFound() {
// verifies that files are routed to failure when the SMTP host doesn't exist
final TestRunner runner = TestRunners.newTestRunner(new PutEmail());
runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123");
runner.setProperty(PutEmail.FROM, "test@apache.org");
runner.setProperty(PutEmail.TO, "test@apache.org");
runner.setProperty(PutEmail.MESSAGE, "Message Body");
final Map<String, String> attributes = new HashMap<>();
runner.enqueue("Some Text".getBytes(), attributes);
runner.run();
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
}
}