From 878e950accd1822a50c792a6466645208bb4dffb Mon Sep 17 00:00:00 2001 From: Brian Ghigiarelli Date: Fri, 27 Mar 2015 14:28:21 -0400 Subject: [PATCH 1/4] Adding HTML support to the PutEmail service by allowing the content mime type to be configurable with a PropertyDescriptor Signed-off-by: Mark Payne --- .../nifi/processors/standard/PutEmail.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java index f6320005f1..144dd5b7a1 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java @@ -132,6 +132,14 @@ public class PutEmail extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("NiFi") .build(); + public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() + .name("Content Type") + .description("Mime Type used to interpret the contents of the email, such as text/plain or text/html") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("text/plain") + .build(); public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder() .name("From") .description("Specifies the Email address to use as the sender") @@ -223,6 +231,7 @@ public class PutEmail extends AbstractProcessor { properties.add(SMTP_TLS); properties.add(SMTP_SOCKET_FACTORY); properties.add(HEADER_XMAILER); + properties.add(CONTENT_TYPE); properties.add(FROM); properties.add(TO); properties.add(CC); @@ -297,10 +306,11 @@ public class PutEmail extends AbstractProcessor { if (context.getProperty(INCLUDE_ALL_ATTRIBUTES).asBoolean()) { messageText = formatAttributes(flowFile, messageText); } - - message.setText(messageText); + + String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue(); + message.setContent(messageText, contentType); message.setSentDate(new Date()); - + if (context.getProperty(ATTACH_FILE).asBoolean()) { final MimeBodyPart mimeText = new PreencodedMimeBodyPart("base64"); mimeText.setDataHandler(new DataHandler(new ByteArrayDataSource(Base64.encodeBase64(messageText.getBytes("UTF-8")), "text/plain; charset=\"utf-8\""))); From 509933f631533a659551c2f0c5c9d12e3f47b8e5 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 31 Mar 2015 08:34:23 -0400 Subject: [PATCH 2/4] NIFI-478: Fixed bug that caused byte sequence to be dropped for last split under certain circumstances; added new unit tests --- .../processors/standard/SplitContent.java | 93 +++++++++++-- .../processors/standard/TestSplitContent.java | 123 ++++++++++++++++++ 2 files changed, 202 insertions(+), 14 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java index 9838af7011..419e12d2f1 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java @@ -18,7 +18,9 @@ package org.apache.nifi.processors.standard; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -28,6 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -35,8 +38,10 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -50,6 +55,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.util.NaiveSearchRingBuffer; import org.apache.nifi.util.Tuple; @@ -72,19 +78,39 @@ public class SplitContent extends AbstractProcessor { public static final String FRAGMENT_COUNT = "fragment.count"; public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; + static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes"); + static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text"); + + static final AllowableValue TRAILING_POSITION = new AllowableValue("Trailing", "Trailing", "Keep the Byte Sequence at the end of the first split if is true"); + static final AllowableValue LEADING_POSITION = new AllowableValue("Leading", "Leading", "Keep the Byte Sequence at the beginning of the second split if is true"); + + public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder() + .name("Byte Sequence Format") + .description("Specifies how the property should be interpreted") + .required(true) + .allowableValues(HEX_FORMAT, UTF8_FORMAT) + .defaultValue(HEX_FORMAT.getValue()) + .build(); public static final PropertyDescriptor BYTE_SEQUENCE = new PropertyDescriptor.Builder() .name("Byte Sequence") - .description("A hex representation of bytes to look for and upon which to split the source file into separate files") - .addValidator(new HexStringPropertyValidator()) + .description("A representation of bytes to look for and upon which to split the source file into separate files") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) .build(); public static final PropertyDescriptor KEEP_SEQUENCE = new PropertyDescriptor.Builder() .name("Keep Byte Sequence") - .description("Determines whether or not the Byte Sequence should be included at the end of each Split") + .description("Determines whether or not the Byte Sequence should be included with each Split") .required(true) .allowableValues("true", "false") .defaultValue("false") .build(); + public static final PropertyDescriptor BYTE_SEQUENCE_LOCATION = new PropertyDescriptor.Builder() + .name("Byte Sequence Location") + .description("If is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; if is false, this property is ignored.") + .required(true) + .allowableValues(TRAILING_POSITION, LEADING_POSITION) + .defaultValue(TRAILING_POSITION.getValue()) + .build(); public static final Relationship REL_SPLITS = new Relationship.Builder() .name("splits") @@ -108,8 +134,10 @@ public class SplitContent extends AbstractProcessor { this.relationships = Collections.unmodifiableSet(relationships); final List properties = new ArrayList<>(); + properties.add(FORMAT); properties.add(BYTE_SEQUENCE); properties.add(KEEP_SEQUENCE); + properties.add(BYTE_SEQUENCE_LOCATION); this.properties = Collections.unmodifiableList(properties); } @@ -124,13 +152,27 @@ public class SplitContent extends AbstractProcessor { } @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - if (descriptor.equals(BYTE_SEQUENCE)) { - try { - this.byteSequence.set(Hex.decodeHex(newValue.toCharArray())); - } catch (final Exception e) { - this.byteSequence.set(null); - } + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(1); + final String format = validationContext.getProperty(FORMAT).getValue(); + if ( HEX_FORMAT.getValue().equals(format) ) { + final String byteSequence = validationContext.getProperty(BYTE_SEQUENCE).getValue(); + final ValidationResult result = new HexStringPropertyValidator().validate(BYTE_SEQUENCE.getName(), byteSequence, validationContext); + results.add(result); + } + return results; + } + + + @OnScheduled + public void initializeByteSequence(final ProcessContext context) throws DecoderException { + final String bytePattern = context.getProperty(BYTE_SEQUENCE).getValue(); + + final String format = context.getProperty(FORMAT).getValue(); + if ( HEX_FORMAT.getValue().equals(format) ) { + this.byteSequence.set(Hex.decodeHex(bytePattern.toCharArray())); + } else { + this.byteSequence.set(bytePattern.getBytes(StandardCharsets.UTF_8)); } } @@ -143,6 +185,21 @@ public class SplitContent extends AbstractProcessor { final ProcessorLog logger = getLogger(); final boolean keepSequence = context.getProperty(KEEP_SEQUENCE).asBoolean(); + final boolean keepTrailingSequence; + final boolean keepLeadingSequence; + if ( keepSequence ) { + if ( context.getProperty(BYTE_SEQUENCE_LOCATION).getValue().equals(TRAILING_POSITION.getValue()) ) { + keepTrailingSequence = true; + keepLeadingSequence = false; + } else { + keepTrailingSequence = false; + keepLeadingSequence = true; + } + } else { + keepTrailingSequence = false; + keepLeadingSequence = false; + } + final byte[] byteSequence = this.byteSequence.get(); if (byteSequence == null) { // should never happen. But just in case... logger.error("{} Unable to obtain Byte Sequence", new Object[]{this}); @@ -169,15 +226,20 @@ public class SplitContent extends AbstractProcessor { bytesRead++; boolean matched = buffer.addAndCompare((byte) (nextByte & 0xFF)); if (matched) { - final long splitLength; + long splitLength; - if (keepSequence) { + if (keepTrailingSequence) { splitLength = bytesRead - startOffset; } else { splitLength = bytesRead - startOffset - byteSequence.length; } - splits.add(new Tuple<>(startOffset, splitLength)); + if ( keepLeadingSequence && startOffset > 0 ) { + splitLength += byteSequence.length; + } + + final long splitStart = (keepLeadingSequence && startOffset > 0) ? startOffset - byteSequence.length : startOffset; + splits.add(new Tuple<>(splitStart, splitLength)); startOffset = bytesRead; buffer.clear(); } @@ -207,8 +269,11 @@ public class SplitContent extends AbstractProcessor { lastOffsetPlusSize = offset + size; } + // lastOffsetPlusSize indicates the ending position of the last split. + // if the data didn't end with the byte sequence, we need one final split to run from the end + // of the last split to the end of the content. long finalSplitOffset = lastOffsetPlusSize; - if (!keepSequence) { + if (!keepTrailingSequence && !keepLeadingSequence) { finalSplitOffset += byteSequence.length; } if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) { diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java index 2e0806244d..07c255b3c1 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java @@ -29,6 +29,129 @@ import org.junit.Test; public class TestSplitContent { + @Test + public void testTextFormatLeadingPosition() { + final TestRunner runner = TestRunners.newTestRunner(new SplitContent()); + runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue()); + runner.setProperty(SplitContent.BYTE_SEQUENCE, "ub"); + runner.setProperty(SplitContent.KEEP_SEQUENCE, "true"); + runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue()); + + runner.enqueue("rub-a-dub-dub".getBytes()); + runner.run(); + + runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitContent.REL_SPLITS, 4); + + runner.assertQueueEmpty(); + final List splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); + splits.get(0).assertContentEquals("r"); + splits.get(1).assertContentEquals("ub-a-d"); + splits.get(2).assertContentEquals("ub-d"); + splits.get(3).assertContentEquals("ub"); + } + + + @Test + public void testTextFormatSplits() { + final TestRunner runner = TestRunners.newTestRunner(new SplitContent()); + runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue()); + runner.setProperty(SplitContent.BYTE_SEQUENCE, "test"); + runner.setProperty(SplitContent.KEEP_SEQUENCE, "true"); + runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue()); + + final byte[] input = "This is a test. This is another test. And this is yet another test. Finally this is the last Test.".getBytes(); + runner.enqueue(input); + runner.run(); + + runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitContent.REL_SPLITS, 4); + + runner.assertQueueEmpty(); + List splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); + splits.get(0).assertContentEquals("This is a "); + splits.get(1).assertContentEquals("test. This is another "); + splits.get(2).assertContentEquals("test. And this is yet another "); + splits.get(3).assertContentEquals("test. Finally this is the last Test."); + runner.clearTransferState(); + + runner.setProperty(SplitContent.KEEP_SEQUENCE, "false"); + runner.enqueue(input); + runner.run(); + runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitContent.REL_SPLITS, 4); + splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); + splits.get(0).assertContentEquals("This is a "); + splits.get(1).assertContentEquals(". This is another "); + splits.get(2).assertContentEquals(". And this is yet another "); + splits.get(3).assertContentEquals(". Finally this is the last Test."); + runner.clearTransferState(); + + runner.setProperty(SplitContent.KEEP_SEQUENCE, "true"); + runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue()); + runner.enqueue(input); + runner.run(); + runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitContent.REL_SPLITS, 4); + splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); + splits.get(0).assertContentEquals("This is a test"); + splits.get(1).assertContentEquals(". This is another test"); + splits.get(2).assertContentEquals(". And this is yet another test"); + splits.get(3).assertContentEquals(". Finally this is the last Test."); + runner.clearTransferState(); + + runner.setProperty(SplitContent.KEEP_SEQUENCE, "true"); + runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue()); + runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes()); + runner.run(); + runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitContent.REL_SPLITS, 4); + splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); + splits.get(0).assertContentEquals("This is a test"); + splits.get(1).assertContentEquals(". This is another test"); + splits.get(2).assertContentEquals(". And this is yet another test"); + splits.get(3).assertContentEquals(". Finally this is the last test"); + runner.clearTransferState(); + + runner.setProperty(SplitContent.KEEP_SEQUENCE, "true"); + runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue()); + runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes()); + runner.run(); + runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitContent.REL_SPLITS, 5); + splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); + splits.get(0).assertContentEquals("This is a "); + splits.get(1).assertContentEquals("test. This is another "); + splits.get(2).assertContentEquals("test. And this is yet another "); + splits.get(3).assertContentEquals("test. Finally this is the last "); + splits.get(4).assertContentEquals("test"); + + runner.clearTransferState(); + } + + + @Test + public void testTextFormatTrailingPosition() { + final TestRunner runner = TestRunners.newTestRunner(new SplitContent()); + runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue()); + runner.setProperty(SplitContent.BYTE_SEQUENCE, "ub"); + runner.setProperty(SplitContent.KEEP_SEQUENCE, "true"); + runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue()); + + runner.enqueue("rub-a-dub-dub".getBytes()); + runner.run(); + + runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitContent.REL_SPLITS, 3); + + runner.assertQueueEmpty(); + final List splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); + splits.get(0).assertContentEquals("rub"); + splits.get(1).assertContentEquals("-a-dub"); + splits.get(2).assertContentEquals("-dub"); + } + + @Test public void testSmallSplits() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new SplitContent()); From ce5a654e96ef257d900a555b595831dd944cb616 Mon Sep 17 00:00:00 2001 From: joewitt Date: Fri, 17 Apr 2015 07:59:54 -0400 Subject: [PATCH 3/4] NIFI-522 updated Jetty version and removed extraneous name element from unrelated pom --- nifi/nifi-commons/nifi-hl7-query-language/pom.xml | 3 --- nifi/pom.xml | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/nifi/nifi-commons/nifi-hl7-query-language/pom.xml b/nifi/nifi-commons/nifi-hl7-query-language/pom.xml index 9d0003fcec..7daa400835 100644 --- a/nifi/nifi-commons/nifi-hl7-query-language/pom.xml +++ b/nifi/nifi-commons/nifi-hl7-query-language/pom.xml @@ -25,9 +25,6 @@ nifi-hl7-query-language jar - - NiFi Health Level 7 (HL7) Query Language - diff --git a/nifi/pom.xml b/nifi/pom.xml index 9b8bfb441d..d500cbe8ce 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -84,7 +84,7 @@ UTF-8 UTF-8 1.7.10 - 9.2.5.v20141112 + 9.2.10.v20150310 4.10.3 4.1.4.RELEASE 3.2.5.RELEASE From a06c25373fbd4103c2c9ba1ba0d75d94726700d2 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 17 Apr 2015 09:13:57 -0400 Subject: [PATCH 4/4] NIFI-523: Do not read all lucene documents when we dont need to --- .../nifi/provenance/lucene/DocsReader.java | 20 ++++----- .../serialization/RecordReaders.java | 41 +++++++++++++++++-- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index af5fe5079f..6446a35497 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -48,18 +48,22 @@ public class DocsReader { return Collections.emptySet(); } - final List docs = new ArrayList<>(); + final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults); + final List docs = new ArrayList<>(numDocs); - for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + for (final ScoreDoc scoreDoc : topDocs.scoreDocs) { final int docId = scoreDoc.doc; final Document d = indexReader.document(docId); docs.add(d); + if ( retrievalCount.incrementAndGet() >= maxResults ) { + break; + } } - return read(docs, allProvenanceLogFiles, retrievalCount, maxResults); + return read(docs, allProvenanceLogFiles); } - public Set read(final List docs, final Collection allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException { + public Set read(final List docs, final Collection allProvenanceLogFiles) throws IOException { LuceneUtil.sortDocsForRetrieval(docs); RecordReader reader = null; @@ -79,9 +83,6 @@ public class DocsReader { reader.skipTo(byteOffset); final StandardProvenanceEventRecord record = reader.nextRecord(); matchingRecords.add(record); - if (retrievalCount.incrementAndGet() >= maxResults) { - break; - } } catch (final IOException e) { throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); } @@ -91,7 +92,7 @@ public class DocsReader { reader.close(); } - final List potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); + List potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); if (potentialFiles.isEmpty()) { throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); } @@ -108,9 +109,6 @@ public class DocsReader { final StandardProvenanceEventRecord record = reader.nextRecord(); matchingRecords.add(record); - if (retrievalCount.incrementAndGet() >= maxResults) { - break; - } } catch (final IOException e) { throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e); } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index f902b9275f..8f0699529c 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -33,6 +33,8 @@ import org.apache.nifi.provenance.lucene.LuceneUtil; public class RecordReaders { public static RecordReader newRecordReader(File file, final Collection provenanceLogFiles) throws IOException { + final File originalFile = file; + if (!file.exists()) { if (provenanceLogFiles == null) { throw new FileNotFoundException(file.toString()); @@ -47,11 +49,44 @@ public class RecordReaders { } } - if (file == null || !file.exists()) { - throw new FileNotFoundException(file.toString()); + InputStream fis = null; + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + } catch (final FileNotFoundException fnfe) { + fis = null; + } + } + + openStream: while ( fis == null ) { + final File dir = file.getParentFile(); + final String baseName = LuceneUtil.substringBefore(file.getName(), "."); + + // depending on which rollover actions have occurred, we could have 3 possibilities for the + // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz" + // because most often we are compressing on rollover and most often we have already finished + // compressing by the time that we are querying the data. + for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) { + file = new File(dir, baseName + extension); + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + break openStream; + } catch (final FileNotFoundException fnfe) { + // file was modified by a RolloverAction after we verified that it exists but before we could + // create an InputStream for it. Start over. + fis = null; + continue openStream; + } + } + } + + break; } - final InputStream fis = new FileInputStream(file); + if ( fis == null ) { + throw new FileNotFoundException("Unable to locate file " + originalFile); + } final InputStream readableStream; if (file.getName().endsWith(".gz")) { readableStream = new BufferedInputStream(new GZIPInputStream(fis));