Merge remote-tracking branch 'upstream/develop' into nifi-solr-bundle

This commit is contained in:
bbende 2015-04-17 18:06:14 -04:00
commit e9402a0ddc
5 changed files with 61 additions and 21 deletions

View File

@ -25,9 +25,6 @@
<artifactId>nifi-hl7-query-language</artifactId> <artifactId>nifi-hl7-query-language</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>NiFi Health Level 7 (HL7) Query Language</name>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>

View File

@ -48,18 +48,22 @@ public class DocsReader {
return Collections.emptySet(); return Collections.emptySet();
} }
final List<Document> docs = new ArrayList<>(); final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
final List<Document> docs = new ArrayList<>(numDocs);
for (ScoreDoc scoreDoc : topDocs.scoreDocs) { for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
final int docId = scoreDoc.doc; final int docId = scoreDoc.doc;
final Document d = indexReader.document(docId); final Document d = indexReader.document(docId);
docs.add(d); docs.add(d);
if ( retrievalCount.incrementAndGet() >= maxResults ) {
break;
}
} }
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults); return read(docs, allProvenanceLogFiles);
} }
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException { public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
LuceneUtil.sortDocsForRetrieval(docs); LuceneUtil.sortDocsForRetrieval(docs);
RecordReader reader = null; RecordReader reader = null;
@ -79,9 +83,6 @@ public class DocsReader {
reader.skipTo(byteOffset); reader.skipTo(byteOffset);
final StandardProvenanceEventRecord record = reader.nextRecord(); final StandardProvenanceEventRecord record = reader.nextRecord();
matchingRecords.add(record); matchingRecords.add(record);
if (retrievalCount.incrementAndGet() >= maxResults) {
break;
}
} catch (final IOException e) { } catch (final IOException e) {
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
} }
@ -91,7 +92,7 @@ public class DocsReader {
reader.close(); reader.close();
} }
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
if (potentialFiles.isEmpty()) { if (potentialFiles.isEmpty()) {
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
} }
@ -108,9 +109,6 @@ public class DocsReader {
final StandardProvenanceEventRecord record = reader.nextRecord(); final StandardProvenanceEventRecord record = reader.nextRecord();
matchingRecords.add(record); matchingRecords.add(record);
if (retrievalCount.incrementAndGet() >= maxResults) {
break;
}
} catch (final IOException e) { } catch (final IOException e) {
throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e); throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
} }

View File

@ -33,6 +33,8 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
public class RecordReaders { public class RecordReaders {
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException { public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
final File originalFile = file;
if (!file.exists()) { if (!file.exists()) {
if (provenanceLogFiles == null) { if (provenanceLogFiles == null) {
throw new FileNotFoundException(file.toString()); throw new FileNotFoundException(file.toString());
@ -47,11 +49,44 @@ public class RecordReaders {
} }
} }
if (file == null || !file.exists()) { InputStream fis = null;
throw new FileNotFoundException(file.toString()); if ( file.exists() ) {
try {
fis = new FileInputStream(file);
} catch (final FileNotFoundException fnfe) {
fis = null;
}
} }
final InputStream fis = new FileInputStream(file); openStream: while ( fis == null ) {
final File dir = file.getParentFile();
final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
// depending on which rollover actions have occurred, we could have 3 possibilities for the
// filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
// because most often we are compressing on rollover and most often we have already finished
// compressing by the time that we are querying the data.
for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) {
file = new File(dir, baseName + extension);
if ( file.exists() ) {
try {
fis = new FileInputStream(file);
break openStream;
} catch (final FileNotFoundException fnfe) {
// file was modified by a RolloverAction after we verified that it exists but before we could
// create an InputStream for it. Start over.
fis = null;
continue openStream;
}
}
}
break;
}
if ( fis == null ) {
throw new FileNotFoundException("Unable to locate file " + originalFile);
}
final InputStream readableStream; final InputStream readableStream;
if (file.getName().endsWith(".gz")) { if (file.getName().endsWith(".gz")) {
readableStream = new BufferedInputStream(new GZIPInputStream(fis)); readableStream = new BufferedInputStream(new GZIPInputStream(fis));

View File

@ -132,6 +132,14 @@ public class PutEmail extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("NiFi") .defaultValue("NiFi")
.build(); .build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type")
.description("Mime Type used to interpret the contents of the email, such as text/plain or text/html")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("text/plain")
.build();
public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder() public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
.name("From") .name("From")
.description("Specifies the Email address to use as the sender") .description("Specifies the Email address to use as the sender")
@ -223,6 +231,7 @@ public class PutEmail extends AbstractProcessor {
properties.add(SMTP_TLS); properties.add(SMTP_TLS);
properties.add(SMTP_SOCKET_FACTORY); properties.add(SMTP_SOCKET_FACTORY);
properties.add(HEADER_XMAILER); properties.add(HEADER_XMAILER);
properties.add(CONTENT_TYPE);
properties.add(FROM); properties.add(FROM);
properties.add(TO); properties.add(TO);
properties.add(CC); properties.add(CC);
@ -298,7 +307,8 @@ public class PutEmail extends AbstractProcessor {
messageText = formatAttributes(flowFile, messageText); messageText = formatAttributes(flowFile, messageText);
} }
message.setText(messageText); String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
message.setContent(messageText, contentType);
message.setSentDate(new Date()); message.setSentDate(new Date());
if (context.getProperty(ATTACH_FILE).asBoolean()) { if (context.getProperty(ATTACH_FILE).asBoolean()) {

View File

@ -84,7 +84,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<org.slf4j.version>1.7.10</org.slf4j.version> <org.slf4j.version>1.7.10</org.slf4j.version>
<jetty.version>9.2.5.v20141112</jetty.version> <jetty.version>9.2.10.v20150310</jetty.version>
<lucene.version>4.10.3</lucene.version> <lucene.version>4.10.3</lucene.version>
<spring.version>4.1.4.RELEASE</spring.version> <spring.version>4.1.4.RELEASE</spring.version>
<spring.security.version>3.2.5.RELEASE</spring.security.version> <spring.security.version>3.2.5.RELEASE</spring.security.version>