From 1263e87cdbe55554f8c133be2e1932b6fc3d7581 Mon Sep 17 00:00:00 2001
From: Aldrin Piri
Date: Mon, 2 Mar 2015 19:54:57 -0500
Subject: [PATCH 01/13] Correcting error in EvaluateJsonPath erroneously
referring to XPath in its usage guide.
---
.../index.html | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html
index 58da666403..f30e1b750a 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html
@@ -43,7 +43,7 @@
result evaluations in memory. Accordingly, it is important to consider the anticipated profile of content being
evaluated by this processor and the hardware supporting it especially when working against large JSON documents.
-
+
Properties:
@@ -89,7 +89,7 @@
Return Type
-
Indicates the desired return type of the Xpath expressions.
+
Indicates the desired return type of the JsonPath expressions.
Selecting 'auto-detect' will set the return type to 'json' for a
Destination of 'flowfile-content', and 'scalar' for a Destination of
'flowfile-attribute'.")
From edb46ef311d66e49959684114bd81aeece8882e4 Mon Sep 17 00:00:00 2001
From: Aldrin Piri
Date: Mon, 2 Mar 2015 19:59:33 -0500
Subject: [PATCH 02/13] Removing extraneous, autogenerated tag from
EvaluateJsonPath usage guide
---
.../index.html | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html
index f30e1b750a..82f9614a37 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html
@@ -43,7 +43,7 @@
result evaluations in memory. Accordingly, it is important to consider the anticipated profile of content being
evaluated by this processor and the hardware supporting it especially when working against large JSON documents.
-
+
Properties:
From 3629d2441bace2a3221fa59c549eea1e6aabb786 Mon Sep 17 00:00:00 2001
From: danbress
Date: Sun, 8 Mar 2015 11:12:47 -0400
Subject: [PATCH 03/13] NIFI-333 catching ProcessException instead of Exception
for HDFS helper classes
---
.../nifi/processors/hadoop/KeyValueReader.java | 16 ++++++++--------
.../nifi/processors/hadoop/ValueReader.java | 16 ++++++++--------
2 files changed, 16 insertions(+), 16 deletions(-)
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java
index d7a679f0bc..40ef5fa246 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java
@@ -24,13 +24,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processors.hadoop.util.OutputStreamWritable;
-import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
-
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +32,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.hadoop.util.OutputStreamWritable;
+import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +90,7 @@ public class KeyValueReader implements SequenceFileReader> {
try {
flowFile = session.write(flowFile, callback);
flowFiles.add(flowFile);
- } catch (Exception e) {
+ } catch (ProcessException e) {
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
session.remove(flowFile);
}
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java
index 0fa0307b3e..e334582ec6 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java
@@ -23,13 +23,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processors.hadoop.util.OutputStreamWritable;
-import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
-
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -38,6 +31,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.hadoop.util.OutputStreamWritable;
+import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,7 +85,7 @@ public class ValueReader implements SequenceFileReader> {
try {
flowFile = session.write(flowFile, writer);
flowFiles.add(flowFile);
- } catch (Exception e) {
+ } catch (ProcessException e) {
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
session.remove(flowFile);
}
From e4ab0f4a76224305180f95567ce436cd52505d66 Mon Sep 17 00:00:00 2001
From: Mark Payne
Date: Tue, 24 Feb 2015 21:30:18 -0500
Subject: [PATCH 04/13] NIFI-380: Initial import of ExecuteProcess processor
Signed-off-by: joewitt
---
.../processors/standard/ExecuteProcess.java | 494 ++++++++++++++++++
.../org.apache.nifi.processor.Processor | 1 +
.../standard/TestExecuteProcess.java | 78 +++
3 files changed, 573 insertions(+)
create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
new file mode 100644
index 0000000000..6f68f7d209
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+
+@Tags({"command", "process", "source", "external", "invoke", "script"})
+@CapabilityDescription("Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected "
+ + "to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual "
+ + "format, as it typically does not make sense to split binary data on arbitrary time-based intervals.")
+public class ExecuteProcess extends AbstractProcessor {
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All created FlowFiles are routed to this relationship")
+ .build();
+
+ public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder()
+ .name("Command Path")
+ .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder()
+ .name("Command Arguments")
+ .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
+ .name("Working Directory")
+ .description("The directory to use as the current working directory when executing the command")
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder()
+ .name("Batch Duration")
+ .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
+ + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
+ + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private volatile ExecutorService executor;
+
+ @Override
+ public Set getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>();
+ properties.add(COMMAND);
+ properties.add(COMMAND_ARGUMENTS);
+ properties.add(BATCH_DURATION);
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
+ .dynamic(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ }
+
+ static List splitArgs(final String input) {
+ if ( input == null ) {
+ return Collections.emptyList();
+ }
+
+ final List args = new ArrayList<>();
+
+ final String trimmed = input.trim();
+ boolean inQuotes = false;
+
+ final StringBuilder sb = new StringBuilder();
+ for (int i=0; i < trimmed.length(); i++) {
+ final char c = trimmed.charAt(i);
+ switch (c) {
+ case ' ':
+ case '\t':
+ case '\r':
+ case '\n': {
+ if ( inQuotes ) {
+ sb.append(c);
+ } else {
+ final String arg = sb.toString().trim();
+ if ( !arg.isEmpty() ) {
+ args.add(arg);
+ }
+ sb.setLength(0);
+ }
+ break;
+ }
+ case '"':
+ inQuotes = !inQuotes;
+ break;
+ default:
+ sb.append(c);
+ break;
+ }
+ }
+
+ final String finalArg = sb.toString().trim();
+ if ( !finalArg.isEmpty() ) {
+ args.add(finalArg);
+ }
+
+ return args;
+ }
+
+ @OnScheduled
+ public void setupExecutor(final ProcessContext context) {
+ executor = Executors.newFixedThreadPool(context.getMaxConcurrentTasks() * 2, new ThreadFactory() {
+ private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread t = defaultFactory.newThread(r);
+ t.setName("ExecuteProcess " + getIdentifier() + " Task");
+ return t;
+ }
+ });
+ }
+
+ @OnUnscheduled
+ public void shutdownExecutor() {
+ executor.shutdown();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final String command = context.getProperty(COMMAND).getValue();
+ final List args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
+
+ final List commandStrings = new ArrayList<>(args.size() + 1);
+ commandStrings.add(command);
+ commandStrings.addAll(args);
+
+ final String commandString = StringUtils.join(commandStrings, " ");
+
+ final ProcessBuilder builder = new ProcessBuilder(commandStrings);
+ final String workingDirName = context.getProperty(WORKING_DIR).getValue();
+ if ( workingDirName != null ) {
+ builder.directory(new File(workingDirName));
+ }
+
+ final Map environment = new HashMap<>();
+ for ( final Map.Entry entry : context.getProperties().entrySet() ) {
+ if ( entry.getKey().isDynamic() ) {
+ environment.put(entry.getKey().getName(), entry.getValue());
+ }
+ }
+
+ if ( !environment.isEmpty() ) {
+ builder.environment().putAll(environment);
+ }
+
+ final long startNanos = System.nanoTime();
+ final Process process;
+ try {
+ process = builder.start();
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to create process due to {}", new Object[] {ioe});
+ context.yield();
+ return;
+ }
+
+ final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
+
+ // Submit task to read error stream from process
+ final AtomicReference errorStream = new AtomicReference<>();
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ final StringBuilder sb = new StringBuilder();
+ try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if ( sb.length() < 4000 ) {
+ sb.append(line);
+ sb.append("\n");
+ }
+ }
+ } catch (final IOException ioe) {
+ }
+
+ errorStream.set(sb.toString());
+ }
+ });
+
+ // Submit task to read output of Process and write to FlowFile.
+ final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger());
+ final AtomicBoolean failure = new AtomicBoolean(false);
+ final AtomicBoolean finishedCopying = new AtomicBoolean(false);
+ final Future> future = executor.submit(new Callable
-
- Batch Duration>
-
-
- If the process is expected to be long-running and produce textual output, a batch duration can be specified so
- that the output will be captured for this amount of time and a FlowFile will then be sent out with the results
- and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results.
- If no value is provided, the process will run to completion and the entire output of the process will be written
- to a single FlowFile.
-
-
Default value: none
+
Batch Duration
+
+
+ If the process is expected to be long-running and produce textual output, a batch duration can be specified so
+ that the output will be captured for this amount of time and a FlowFile will then be sent out with the results
+ and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results.
+ If no value is provided, the process will run to completion and the entire output of the process will be written
+ to a single FlowFile.
+
+
Default value: none
Supports expression language: false
-
-
+
+
+
Redirect Error Stream
+
+
+ If true will redirect any error stream output of the process to the output stream.
+ This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.
+
+
Default value: false
+
Allowed Values: true, false
+
Supports expression language: false
+
+
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
index 0aa2ee3237..897973c398 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
@@ -59,12 +59,12 @@ public class TestExecuteProcess {
}
@Test
- public void testPing() {
+ public void testEcho() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
- runner.setProperty(ExecuteProcess.COMMAND, "ping");
- runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "127.0.0.1");
+ runner.setProperty(ExecuteProcess.COMMAND, "echo");
+ runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "test-args");
runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis");
runner.run();
From 4f009fc3fb57a698dd21b7322975b02df4a4a0dc Mon Sep 17 00:00:00 2001
From: joewitt
Date: Mon, 9 Mar 2015 10:50:17 -0400
Subject: [PATCH 07/13] NIFI-333 This closes #33
From 4e731254416307c5d027a34ad7910c1d43de1abb Mon Sep 17 00:00:00 2001
From: joewitt
Date: Mon, 9 Mar 2015 13:13:34 -0400
Subject: [PATCH 08/13] NIFI-403 addressed licensing/notice and trivial
pom/source issues
---
nifi/nifi-assembly/LICENSE | 4 +-
nifi/nifi-assembly/NOTICE | 27 ++
.../nifi-kite-bundle/nifi-kite-nar/pom.xml | 82 +++--
.../src/main/resources/META-INF/LICENSE | 240 ++++++++++++++
.../src/main/resources/META-INF/NOTICE | 78 +++++
.../nifi-kite-processors/pom.xml | 255 ++++++++-------
.../kite/AbstractKiteProcessor.java | 303 +++++++++---------
.../apache/nifi/processors/kite/AvroUtil.java | 3 -
.../processors/kite/ConvertCSVToAvro.java | 9 +-
.../processors/kite/ConvertJSONToAvro.java | 178 +++++-----
.../processors/kite/StoreInKiteDataset.java | 6 +-
.../nifi-nar-bundles/nifi-kite-bundle/pom.xml | 105 +++---
12 files changed, 812 insertions(+), 478 deletions(-)
create mode 100644 nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/LICENSE
create mode 100644 nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/NOTICE
diff --git a/nifi/nifi-assembly/LICENSE b/nifi/nifi-assembly/LICENSE
index bf9e6490ea..38386b9a81 100644
--- a/nifi/nifi-assembly/LICENSE
+++ b/nifi/nifi-assembly/LICENSE
@@ -589,8 +589,8 @@ This product bundles 'reset.css' which is available in the 'public domain'.
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- The binary distribution of this product bundles 'Paranamer Core' which is available
- under a BSD style license.
+ The binary distribution of this product bundles 'ParaNamer' and 'Paranamer Core'
+ which is available under a BSD style license.
Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
All rights reserved.
diff --git a/nifi/nifi-assembly/NOTICE b/nifi/nifi-assembly/NOTICE
index 025d265138..4ead623d50 100644
--- a/nifi/nifi-assembly/NOTICE
+++ b/nifi/nifi-assembly/NOTICE
@@ -90,6 +90,11 @@ The following binary components are provided under the Apache Software License v
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+ (ASLv2) Apache Commons JEXL
+ The following NOTICE information applies:
+ Apache Commons JEXL
+ Copyright 2001-2011 The Apache Software Foundation
+
(ASLv2) Spring Framework
The following NOTICE information applies:
Spring Framework 4.1.4.RELEASE
@@ -470,6 +475,28 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies:
Copyright 2011 JsonPath authors
+ (ASLv2) Kite SDK
+ The following NOTICE information applies:
+ This product includes software developed by Cloudera, Inc.
+ (http://www.cloudera.com/).
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ This product includes software developed by
+ Saxonica (http://www.saxonica.com/).
+
+ (ASLv2) Parquet MR
+ The following NOTICE information applies:
+ Parquet MR
+ Copyright 2012 Twitter, Inc.
+
+ This project includes code from https://github.com/lemire/JavaFastPFOR
+ parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java
+ Apache License Version 2.0 http://www.apache.org/licenses/.
+ (c) Daniel Lemire, http://lemire.me/en/
+
+
************************
Common Development and Distribution License 1.1
************************
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml
index 78d8bf28a8..2f64cafd04 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml
@@ -1,47 +1,45 @@
-
- 4.0.0
+
+ 4.0.0
-
- org.apache.nifi
- nifi-kite-bundle
- 0.0.2-incubating-SNAPSHOT
-
+
+ org.apache.nifi
+ nifi-kite-bundle
+ 0.0.2-incubating-SNAPSHOT
+
- nifi-kite-nar
- nar
+ nifi-kite-nar
+ nar
- Kite NAR
-
-
-
- org.apache.nifi
- nifi-hadoop-libraries-nar
- nar
-
-
- org.apache.nifi
- nifi-kite-processors
-
-
-
- org.apache.hadoop
- hadoop-client
-
-
-
-
+
+
+ org.apache.nifi
+ nifi-hadoop-libraries-nar
+ nar
+
+
+ org.apache.nifi
+ nifi-kite-processors
+
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+
+
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/LICENSE b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..6fb2525c34
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,240 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
+This product bundles 'ParaNamer' which is available under a BSD style license.
+For details see http://asm.ow2.org/asmdex-license.html
+
+ Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
+ All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions
+ are met:
+ 1. Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ 2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ 3. Neither the name of the copyright holders nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ THE POSSIBILITY OF SUCH DAMAGE.
+
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..66204cfa7f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,78 @@
+nifi-kite-nar
+Copyright 2014-2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Avro
+ The following NOTICE information applies:
+ Apache Avro
+ Copyright 2009-2013 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ ===============================================================================
+
+ The content of package org.apache.commons.codec.language.bm has been translated
+ from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+ with permission from the original authors.
+ Original source copyright:
+ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+ (ASLv2) Apache Commons JEXL
+ The following NOTICE information applies:
+ Apache Commons JEXL
+ Copyright 2001-2011 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Logging
+ The following NOTICE information applies:
+ Apache Commons Logging
+ Copyright 2003-2013 The Apache Software Foundation
+
+ (ASLv2) Kite SDK
+ The following NOTICE information applies:
+ This product includes software developed by Cloudera, Inc.
+ (http://www.cloudera.com/).
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ This product includes software developed by
+ Saxonica (http://www.saxonica.com/).
+
+ (ASLv2) Snappy Java
+ The following NOTICE information applies:
+ This product includes software developed by Google
+ Snappy: http://code.google.com/p/snappy/ (New BSD License)
+
+ This product includes software developed by Apache
+ PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
+ (Apache 2.0 license)
+
+ This library contains statically linked libstdc++. This inclusion is allowed by
+ "GCC RUntime Library Exception"
+ http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
+
+ (ASLv2) Parquet MR
+ The following NOTICE information applies:
+ Parquet MR
+ Copyright 2012 Twitter, Inc.
+
+ This project includes code from https://github.com/lemire/JavaFastPFOR
+ parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java
+ Apache License Version 2.0 http://www.apache.org/licenses/.
+ (c) Daniel Lemire, http://lemire.me/en/
+
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
index 337298874e..03e621fc7b 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
@@ -1,148 +1,147 @@
-
- 4.0.0
+
+ 4.0.0
-
- org.apache.nifi
- nifi-kite-bundle
- 0.0.2-incubating-SNAPSHOT
-
+
+ org.apache.nifi
+ nifi-kite-bundle
+ 0.0.2-incubating-SNAPSHOT
+
- nifi-kite-processors
- jar
- Kite Hadoop Processors
+ nifi-kite-processors
+ jar
-
- 0.18.0
- 11.0.2
- 4.10
- 1.3.9-1
-
+
+ 0.18.0
+ 11.0.2
+ 4.10
+ 1.3.9-1
+
-
-
+
+
-
- org.apache.nifi
- nifi-api
-
-
- org.apache.nifi
- nifi-utils
-
-
- org.apache.nifi
- nifi-processor-utils
-
-
- org.apache.nifi
- nifi-flowfile-packager
-
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-utils
+
+
+ org.apache.nifi
+ nifi-processor-utils
+
+
+ org.apache.nifi
+ nifi-flowfile-packager
+
-
+
-
- org.kitesdk
- kite-data-core
- ${kite.version}
-
-
-
- com.google.code.findbugs
- jsr305
-
-
-
+
+ org.kitesdk
+ kite-data-core
+ ${kite.version}
+
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+
-
- org.kitesdk
- kite-hadoop-dependencies
- pom
- ${kite.version}
-
-
- org.apache.hadoop
- hadoop-mapreduce-client-app
-
-
- org.apache.hadoop
- hadoop-mapreduce-client-jobclient
-
-
-
+
+ org.kitesdk
+ kite-hadoop-dependencies
+ pom
+ ${kite.version}
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-app
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+
-
- com.google.guava
- guava
- ${guava.version}
- compile
-
+
+ com.google.guava
+ guava
+ ${guava.version}
+ compile
+
-
-
- com.github.stephenc.findbugs
- findbugs-annotations
- compile
- ${findbugs-annotations.version}
-
+
+
+ com.github.stephenc.findbugs
+ findbugs-annotations
+ compile
+ ${findbugs-annotations.version}
+
-
+
-
- junit
- junit
- test
- ${junit.version}
-
+
+ junit
+ junit
+ test
+ ${junit.version}
+
-
- org.apache.nifi
- nifi-mock
- test
-
+
+ org.apache.nifi
+ nifi-mock
+ test
+
-
- org.kitesdk
- kite-minicluster
- ${kite.version}
- test
-
+
+ org.kitesdk
+ kite-minicluster
+ ${kite.version}
+ test
+
-
- com.sun.jersey
- jersey-servlet
- 1.14
- test
-
+
+ com.sun.jersey
+ jersey-servlet
+ 1.14
+ test
+
-
- org.kitesdk
- kite-data-core
- ${kite.version}
- test-jar
- test
-
+
+ org.kitesdk
+ kite-data-core
+ ${kite.version}
+ test-jar
+ test
+
-
- org.kitesdk
- kite-hadoop-test-dependencies
- pom
- test
- ${kite.version}
-
+
+ org.kitesdk
+ kite-hadoop-test-dependencies
+ pom
+ test
+ ${kite.version}
+
-
+
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
index c5104893b4..98329ff4ea 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.nifi.processors.kite;
import com.google.common.base.Splitter;
@@ -47,171 +46,171 @@ import org.kitesdk.data.spi.DefaultConfiguration;
abstract class AbstractKiteProcessor extends AbstractProcessor {
- private static final Splitter COMMA = Splitter.on(',').trimResults();
- protected static final Validator FILES_EXIST = new Validator() {
- @Override
- public ValidationResult validate(String subject, String configFiles,
- ValidationContext context) {
- if (configFiles != null && !configFiles.isEmpty()) {
- for (String file : COMMA.split(configFiles)) {
- ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR
- .validate(subject, file, context);
- if (!result.isValid()) {
- return result;
- }
+ private static final Splitter COMMA = Splitter.on(',').trimResults();
+ protected static final Validator FILES_EXIST = new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String configFiles,
+ ValidationContext context) {
+ if (configFiles != null && !configFiles.isEmpty()) {
+ for (String file : COMMA.split(configFiles)) {
+ ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR
+ .validate(subject, file, context);
+ if (!result.isValid()) {
+ return result;
+ }
+ }
+ }
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(configFiles)
+ .explanation("Files exist")
+ .valid(true)
+ .build();
}
- }
- return new ValidationResult.Builder()
- .subject(subject)
- .input(configFiles)
- .explanation("Files exist")
- .valid(true)
- .build();
- }
- };
+ };
- protected static final PropertyDescriptor CONF_XML_FILES =
- new PropertyDescriptor.Builder()
- .name("Hadoop configuration files")
- .description("A comma-separated list of Hadoop configuration files")
- .addValidator(FILES_EXIST)
- .build();
+ protected static final PropertyDescriptor CONF_XML_FILES
+ = new PropertyDescriptor.Builder()
+ .name("Hadoop configuration files")
+ .description("A comma-separated list of Hadoop configuration files")
+ .addValidator(FILES_EXIST)
+ .build();
- protected static final Validator RECOGNIZED_URI = new Validator() {
- @Override
- public ValidationResult validate(String subject, String uri,
- ValidationContext context) {
- String message = "not set";
- boolean isValid = true;
- if (uri == null || uri.isEmpty()) {
- isValid = false;
- } else {
+ protected static final Validator RECOGNIZED_URI = new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String uri,
+ ValidationContext context) {
+ String message = "not set";
+ boolean isValid = true;
+ if (uri == null || uri.isEmpty()) {
+ isValid = false;
+ } else {
+ try {
+ new URIBuilder(URI.create(uri)).build();
+ } catch (RuntimeException e) {
+ message = e.getMessage();
+ isValid = false;
+ }
+ }
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(uri)
+ .explanation("Dataset URI is invalid: " + message)
+ .valid(isValid)
+ .build();
+ }
+ };
+
+ /**
+ * Resolves a {@link Schema} for the given string, either a URI or a JSON
+ * literal.
+ */
+ protected static Schema getSchema(String uriOrLiteral, Configuration conf) {
+ URI uri;
try {
- new URIBuilder(URI.create(uri)).build();
- } catch (RuntimeException e) {
- message = e.getMessage();
- isValid = false;
+ uri = new URI(uriOrLiteral);
+ } catch (URISyntaxException e) {
+ // try to parse the schema as a literal
+ return parseSchema(uriOrLiteral);
}
- }
- return new ValidationResult.Builder()
- .subject(subject)
- .input(uri)
- .explanation("Dataset URI is invalid: " + message)
- .valid(isValid)
- .build();
- }
- };
- /**
- * Resolves a {@link Schema} for the given string, either a URI or a JSON
- * literal.
- */
- protected static Schema getSchema(String uriOrLiteral, Configuration conf) {
- URI uri;
- try {
- uri = new URI(uriOrLiteral);
- } catch (URISyntaxException e) {
- // try to parse the schema as a literal
- return parseSchema(uriOrLiteral);
+ try {
+ if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) {
+ return Datasets.load(uri).getDataset().getDescriptor().getSchema();
+ } else if ("resource".equals(uri.getScheme())) {
+ InputStream in = Resources.getResource(uri.getSchemeSpecificPart())
+ .openStream();
+ return parseSchema(uri, in);
+ } else {
+ // try to open the file
+ Path schemaPath = new Path(uri);
+ FileSystem fs = schemaPath.getFileSystem(conf);
+ return parseSchema(uri, fs.open(schemaPath));
+ }
+
+ } catch (DatasetNotFoundException e) {
+ throw new SchemaNotFoundException(
+ "Cannot read schema of missing dataset: " + uri, e);
+ } catch (IOException e) {
+ throw new SchemaNotFoundException(
+ "Failed while reading " + uri + ": " + e.getMessage(), e);
+ }
}
- try {
- if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) {
- return Datasets.load(uri).getDataset().getDescriptor().getSchema();
- } else if ("resource".equals(uri.getScheme())) {
- InputStream in = Resources.getResource(uri.getSchemeSpecificPart())
- .openStream();
- return parseSchema(uri, in);
- } else {
- // try to open the file
- Path schemaPath = new Path(uri);
- FileSystem fs = schemaPath.getFileSystem(conf);
- return parseSchema(uri, fs.open(schemaPath));
- }
-
- } catch (DatasetNotFoundException e) {
- throw new SchemaNotFoundException(
- "Cannot read schema of missing dataset: " + uri, e);
- } catch (IOException e) {
- throw new SchemaNotFoundException(
- "Failed while reading " + uri + ": " + e.getMessage(), e);
+ private static Schema parseSchema(String literal) {
+ try {
+ return new Schema.Parser().parse(literal);
+ } catch (RuntimeException e) {
+ throw new SchemaNotFoundException(
+ "Failed to parse schema: " + literal, e);
+ }
}
- }
- private static Schema parseSchema(String literal) {
- try {
- return new Schema.Parser().parse(literal);
- } catch (RuntimeException e) {
- throw new SchemaNotFoundException(
- "Failed to parse schema: " + literal, e);
+ private static Schema parseSchema(URI uri, InputStream in) throws IOException {
+ try {
+ return new Schema.Parser().parse(in);
+ } catch (RuntimeException e) {
+ throw new SchemaNotFoundException("Failed to parse schema at " + uri, e);
+ }
}
- }
- private static Schema parseSchema(URI uri, InputStream in) throws IOException {
- try {
- return new Schema.Parser().parse(in);
- } catch (RuntimeException e) {
- throw new SchemaNotFoundException("Failed to parse schema at " + uri, e);
+ protected static final Validator SCHEMA_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String uri, ValidationContext context) {
+ Configuration conf = getConfiguration(
+ context.getProperty(CONF_XML_FILES).getValue());
+
+ String error = null;
+ try {
+ getSchema(uri, conf);
+ } catch (SchemaNotFoundException e) {
+ error = e.getMessage();
+ }
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(uri)
+ .explanation(error)
+ .valid(error == null)
+ .build();
+ }
+ };
+
+ protected static final List ABSTRACT_KITE_PROPS
+ = ImmutableList.builder()
+ .add(CONF_XML_FILES)
+ .build();
+
+ static List getProperties() {
+ return ABSTRACT_KITE_PROPS;
+ }
+
+ @OnScheduled
+ protected void setDefaultConfiguration(ProcessContext context)
+ throws IOException {
+ DefaultConfiguration.set(getConfiguration(
+ context.getProperty(CONF_XML_FILES).getValue()));
+ }
+
+ protected static Configuration getConfiguration(String configFiles) {
+ Configuration conf = DefaultConfiguration.get();
+
+ if (configFiles == null || configFiles.isEmpty()) {
+ return conf;
+ }
+
+ for (String file : COMMA.split(configFiles)) {
+ // process each resource only once
+ if (conf.getResource(file) == null) {
+ // use Path instead of String to get the file from the FS
+ conf.addResource(new Path(file));
+ }
+ }
+
+ return conf;
}
- }
- protected static final Validator SCHEMA_VALIDATOR = new Validator() {
@Override
- public ValidationResult validate(String subject, String uri, ValidationContext context) {
- Configuration conf = getConfiguration(
- context.getProperty(CONF_XML_FILES).getValue());
-
- String error = null;
- try {
- getSchema(uri, conf);
- } catch (SchemaNotFoundException e) {
- error = e.getMessage();
- }
- return new ValidationResult.Builder()
- .subject(subject)
- .input(uri)
- .explanation(error)
- .valid(error == null)
- .build();
+ protected List getSupportedPropertyDescriptors() {
+ return ABSTRACT_KITE_PROPS;
}
- };
-
- protected static final List ABSTRACT_KITE_PROPS =
- ImmutableList.builder()
- .add(CONF_XML_FILES)
- .build();
-
- static List getProperties() {
- return ABSTRACT_KITE_PROPS;
- }
-
- @OnScheduled
- protected void setDefaultConfiguration(ProcessContext context)
- throws IOException {
- DefaultConfiguration.set(getConfiguration(
- context.getProperty(CONF_XML_FILES).getValue()));
- }
-
- protected static Configuration getConfiguration(String configFiles) {
- Configuration conf = DefaultConfiguration.get();
-
- if (configFiles == null || configFiles.isEmpty()) {
- return conf;
- }
-
- for (String file : COMMA.split(configFiles)) {
- // process each resource only once
- if (conf.getResource(file) == null) {
- // use Path instead of String to get the file from the FS
- conf.addResource(new Path(file));
- }
- }
-
- return conf;
- }
-
- @Override
- protected List getSupportedPropertyDescriptors() {
- return ABSTRACT_KITE_PROPS;
- }
}
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
index 0d0b04d65f..9ff0f73f9f 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
@@ -20,13 +20,10 @@
package org.apache.nifi.processors.kite;
import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
-import static org.apache.avro.generic.GenericData.StringType;
class AvroUtil {
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 18240c39d5..5181bbe187 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -42,7 +42,6 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@@ -59,9 +58,9 @@ import static org.apache.nifi.processor.util.StandardValidators.createLongValida
@CapabilityDescription(
"Converts CSV files to Avro according to an Avro Schema")
public class ConvertCSVToAvro extends AbstractKiteProcessor {
- private static CSVProperties DEFAULTS = new CSVProperties.Builder().build();
+ private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();
- private static Validator CHAR_VALIDATOR = new Validator() {
+ private static final Validator CHAR_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input,
ValidationContext context) {
@@ -74,12 +73,12 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
}
};
- private static Relationship SUCCESS = new Relationship.Builder()
+ private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile content has been successfully saved")
.build();
- private static Relationship FAILURE = new Relationship.Builder()
+ private static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile content could not be processed")
.build();
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
index 5200f19d22..8eeea1a9c4 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.nifi.processors.kite;
import com.google.common.annotations.VisibleForTesting;
@@ -38,7 +37,6 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.kitesdk.data.DatasetException;
@@ -48,110 +46,110 @@ import org.kitesdk.data.spi.DefaultConfiguration;
@Tags({"kite", "json", "avro"})
@CapabilityDescription(
- "Converts JSON files to Avro according to an Avro Schema")
+ "Converts JSON files to Avro according to an Avro Schema")
public class ConvertJSONToAvro extends AbstractKiteProcessor {
- private static Relationship SUCCESS = new Relationship.Builder()
- .name("success")
- .description("FlowFile content has been successfully saved")
- .build();
+ private static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFile content has been successfully saved")
+ .build();
- private static Relationship FAILURE = new Relationship.Builder()
- .name("failure")
- .description("FlowFile content could not be processed")
- .build();
+ private static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFile content could not be processed")
+ .build();
- @VisibleForTesting
- static final PropertyDescriptor SCHEMA =
- new PropertyDescriptor.Builder()
- .name("Record schema")
- .description(
- "Outgoing Avro schema for each record created from a JSON object")
- .addValidator(SCHEMA_VALIDATOR)
- .required(true)
- .build();
+ @VisibleForTesting
+ static final PropertyDescriptor SCHEMA
+ = new PropertyDescriptor.Builder()
+ .name("Record schema")
+ .description(
+ "Outgoing Avro schema for each record created from a JSON object")
+ .addValidator(SCHEMA_VALIDATOR)
+ .required(true)
+ .build();
- private static final List PROPERTIES =
- ImmutableList.builder()
- .addAll(AbstractKiteProcessor.getProperties())
- .add(SCHEMA)
- .build();
+ private static final List PROPERTIES
+ = ImmutableList.builder()
+ .addAll(AbstractKiteProcessor.getProperties())
+ .add(SCHEMA)
+ .build();
- private static final Set RELATIONSHIPS =
- ImmutableSet.builder()
- .add(SUCCESS)
- .add(FAILURE)
- .build();
+ private static final Set RELATIONSHIPS
+ = ImmutableSet.builder()
+ .add(SUCCESS)
+ .add(FAILURE)
+ .build();
- public ConvertJSONToAvro() {
- }
-
- @Override
- protected List getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
- @Override
- public Set getRelationships() {
- return RELATIONSHIPS;
- }
-
- @Override
- public void onTrigger(ProcessContext context, final ProcessSession session)
- throws ProcessException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
+ public ConvertJSONToAvro() {
}
- final Schema schema = getSchema(
- context.getProperty(SCHEMA).getValue(),
- DefaultConfiguration.get());
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
- final DataFileWriter writer = new DataFileWriter<>(
- AvroUtil.newDatumWriter(schema, Record.class));
- writer.setCodec(CodecFactory.snappyCodec());
+ @Override
+ public Set getRelationships() {
+ return RELATIONSHIPS;
+ }
- try {
- flowFile = session.write(flowFile, new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- long written = 0L;
- long errors = 0L;
- try (JSONFileReader reader = new JSONFileReader<>(
- in, schema, Record.class)) {
- reader.initialize();
- try (DataFileWriter w = writer.create(schema, out)) {
- while (reader.hasNext()) {
- try {
- Record record = reader.next();
- w.append(record);
- written += 1;
- } catch (DatasetRecordException e) {
- errors += 1;
- }
- }
- }
- session.adjustCounter("Converted records", written,
- false /* update only if file transfer is successful */);
- session.adjustCounter("Conversion errors", errors,
- false /* update only if file transfer is successful */);
- }
+ @Override
+ public void onTrigger(ProcessContext context, final ProcessSession session)
+ throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
}
- });
- session.transfer(flowFile, SUCCESS);
+ final Schema schema = getSchema(
+ context.getProperty(SCHEMA).getValue(),
+ DefaultConfiguration.get());
- //session.getProvenanceReporter().send(flowFile, target.getUri().toString());
- } catch (ProcessException | DatasetIOException e) {
- getLogger().error("Failed reading or writing", e);
- session.transfer(flowFile, FAILURE);
+ final DataFileWriter writer = new DataFileWriter<>(
+ AvroUtil.newDatumWriter(schema, Record.class));
+ writer.setCodec(CodecFactory.snappyCodec());
- } catch (DatasetException e) {
- getLogger().error("Failed to read FlowFile", e);
- session.transfer(flowFile, FAILURE);
+ try {
+ flowFile = session.write(flowFile, new StreamCallback() {
+ @Override
+ public void process(InputStream in, OutputStream out) throws IOException {
+ long written = 0L;
+ long errors = 0L;
+ try (JSONFileReader reader = new JSONFileReader<>(
+ in, schema, Record.class)) {
+ reader.initialize();
+ try (DataFileWriter w = writer.create(schema, out)) {
+ while (reader.hasNext()) {
+ try {
+ Record record = reader.next();
+ w.append(record);
+ written += 1;
+ } catch (DatasetRecordException e) {
+ errors += 1;
+ }
+ }
+ }
+ session.adjustCounter("Converted records", written,
+ false /* update only if file transfer is successful */);
+ session.adjustCounter("Conversion errors", errors,
+ false /* update only if file transfer is successful */);
+ }
+ }
+ });
+ session.transfer(flowFile, SUCCESS);
+
+ //session.getProvenanceReporter().send(flowFile, target.getUri().toString());
+ } catch (ProcessException | DatasetIOException e) {
+ getLogger().error("Failed reading or writing", e);
+ session.transfer(flowFile, FAILURE);
+
+ } catch (DatasetException e) {
+ getLogger().error("Failed to read FlowFile", e);
+ session.transfer(flowFile, FAILURE);
+
+ }
}
- }
}
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
index 223caccbed..314f0083e6 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
@@ -50,17 +50,17 @@ import org.kitesdk.data.spi.SchemaValidationUtil;
@Tags({"kite", "avro", "parquet", "hive", "hdfs", "hbase"})
@CapabilityDescription("Stores Avro records in a Kite dataset")
public class StoreInKiteDataset extends AbstractKiteProcessor {
- private static Relationship SUCCESS = new Relationship.Builder()
+ private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile content has been successfully saved")
.build();
- private static Relationship INCOMPATIBLE = new Relationship.Builder()
+ private static final Relationship INCOMPATIBLE = new Relationship.Builder()
.name("incompatible")
.description("FlowFile content is not compatible with the target dataset")
.build();
- private static Relationship FAILURE = new Relationship.Builder()
+ private static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile content could not be processed")
.build();
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-kite-bundle/pom.xml
index 8984c18c4c..067efc54e1 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/pom.xml
@@ -1,59 +1,58 @@
-
- 4.0.0
+
+ 4.0.0
-
- org.apache.nifi
- nifi-nar-bundles
- 0.0.2-incubating-SNAPSHOT
-
-
- nifi-kite-bundle
- pom
-
- Kite Bundle
- A bundle of processors that use Kite to store data in Hadoop
-
-
- nifi-kite-processors
- nifi-kite-nar
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
- true
-
-
-
-
-
-
-
-
-
+ org.apache.nifi
- nifi-kite-processors
+ nifi-nar-bundles0.0.2-incubating-SNAPSHOT
-
-
-
+
+
+ nifi-kite-bundle
+ pom
+
+ A bundle of processors that use Kite to store data in Hadoop
+
+
+ nifi-kite-processors
+ nifi-kite-nar
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ true
+
+
+
+
+
+
+
+
+
+ org.apache.nifi
+ nifi-kite-processors
+ 0.0.2-incubating-SNAPSHOT
+
+
+
From 1de36927c69ce871c74c290e1f679b05e484fd54 Mon Sep 17 00:00:00 2001
From: joewitt
Date: Mon, 9 Mar 2015 16:55:00 -0400
Subject: [PATCH 09/13] NIFI-238 This closes #24
From 0f68280c296d185e6dac0674902b96c45eece5a2 Mon Sep 17 00:00:00 2001
From: joewitt
Date: Mon, 9 Mar 2015 16:59:47 -0400
Subject: [PATCH 10/13] NIFI-282 added missing license header to site to site
client pom
---
.../nifi-site-to-site-client/pom.xml | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
index 0d21a3d3e2..c440de20c3 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -1,4 +1,18 @@
+
@@ -21,9 +35,6 @@
org.apache.nifinifi-utils
-
org.codehaus.jacksonjackson-mapper-asl
From 96e0f31ccae9c046bdc1927145aba90ecae71fe6 Mon Sep 17 00:00:00 2001
From: joewitt
Date: Mon, 9 Mar 2015 20:45:17 -0400
Subject: [PATCH 11/13] NIFI-402 fixed license issues
---
.../org/apache/tika/mime/custom-mimetypes.xml | 14 ++++++++++++++
.../nifi-http-context-map-api/pom.xml | 14 ++++++++++++++
.../src/main/resources/META-INF/NOTICE | 19 +++++++++++++++++++
...g.apache.nifi.controller.ControllerService | 16 +++++++++++++++-
4 files changed, 62 insertions(+), 1 deletion(-)
create mode 100644 nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map-nar/src/main/resources/META-INF/NOTICE
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/org/apache/tika/mime/custom-mimetypes.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/org/apache/tika/mime/custom-mimetypes.xml
index 657b4b5467..7a357d5673 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/org/apache/tika/mime/custom-mimetypes.xml
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/org/apache/tika/mime/custom-mimetypes.xml
@@ -1,4 +1,18 @@
+
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/pom.xml
index 262a41b448..f8bcfd8181 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/pom.xml
@@ -1,4 +1,18 @@
+
4.0.0
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..8f8b4a5c31
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,19 @@
+nifi-http-context-map-nar
+Copyright 2014-2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2014 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 8c5721c4d4..9e1e941471 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -1 +1,15 @@
-org.apache.nifi.http.StandardHttpContextMap
\ No newline at end of file
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.http.StandardHttpContextMap
From 56cc1860f16d080791772a5b1a6e546f732b41c1 Mon Sep 17 00:00:00 2001
From: Matt Gilman
Date: Wed, 11 Mar 2015 10:17:12 -0400
Subject: [PATCH 12/13] NIFI-411: - Upgrading jersey to address zero byte files
in the temp directory.
---
nifi/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/nifi/pom.xml b/nifi/pom.xml
index bd2aa4d0c4..c444c5700a 100644
--- a/nifi/pom.xml
+++ b/nifi/pom.xml
@@ -88,7 +88,7 @@
4.10.34.1.4.RELEASE3.2.5.RELEASE
- 1.18.3
+ 1.192.6.02014
From 5450a5a2f00626b40b199c2ad551ac0c8cc610e5 Mon Sep 17 00:00:00 2001
From: James Owen
Date: Wed, 11 Mar 2015 18:29:08 -0400
Subject: [PATCH 13/13] Added username/password to PutEmail Processor
---
.../nifi/processors/standard/PutEmail.java | 28 +++++++++++++++----
1 file changed, 22 insertions(+), 6 deletions(-)
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
index eb6b1cca63..d83b100420 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
-import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -43,6 +42,10 @@ import javax.mail.internet.MimeMultipart;
import javax.mail.internet.PreencodedMimeBodyPart;
import javax.mail.util.ByteArrayDataSource;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -54,13 +57,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.commons.codec.binary.Base64;
import com.sun.mail.smtp.SMTPTransport;
@@ -82,6 +81,19 @@ public class PutEmail extends AbstractProcessor {
.defaultValue("25")
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
+ public static final PropertyDescriptor SMTP_USERNAME = new PropertyDescriptor.Builder()
+ .name("SMTP Username")
+ .description("Username for the SMTP account")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
+ public static final PropertyDescriptor SMTP_PASSWORD = new PropertyDescriptor.Builder()
+ .name("SMTP Password")
+ .description("Password for the SMTP account")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .sensitive(true)
+ .build();
public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
.name("From")
.description("Specifies the Email address to use as the sender")
@@ -152,6 +164,8 @@ public class PutEmail extends AbstractProcessor {
final List properties = new ArrayList<>();
properties.add(SMTP_HOSTNAME);
properties.add(SMTP_PORT);
+ properties.add(SMTP_USERNAME);
+ properties.add(SMTP_PASSWORD);
properties.add(FROM);
properties.add(TO);
properties.add(CC);
@@ -255,7 +269,9 @@ public class PutEmail extends AbstractProcessor {
final SMTPTransport transport = new SMTPTransport(mailSession, new URLName(smtpHost));
try {
final int smtpPort = context.getProperty(SMTP_PORT).asInteger();
- transport.connect(new Socket(smtpHost, smtpPort));
+ final String smtpUsername = context.getProperty(SMTP_USERNAME).getValue();
+ final String smtpPassword = context.getProperty(SMTP_PASSWORD).getValue();
+ transport.connect(smtpHost, smtpPort, smtpUsername, smtpPassword);
transport.sendMessage(message, message.getAllRecipients());
} finally {
transport.close();