Merge branch 'develop' into NIFI-413-PutKafka-Compression-and-Batching

This commit is contained in:
Brian Ghigiarelli 2015-06-19 17:46:53 -04:00
commit 5b0648cf3f
322 changed files with 8080 additions and 2494 deletions

View File

@ -24,9 +24,11 @@ Apache NiFi is an easy to use, powerful, and reliable system to process and dist
## Getting Started
- Read through the [quickstart guide for development](http://nifi.incubator.apache.org/development/quickstart.html).
- Read through the [quickstart guide for development](http://nifi.incubator.apache.org/quickstart.html).
It will include information on getting a local copy of the source, give pointers on issue
tracking, and provide some warnings about common problems with development environments.
- For a more comprehensive guide to development and information about contributing to the project
read through the [NiFi Developer's Guide](http://nifi.incubator.apache.org/developer-guide.html).
- Optional: Build supporting modules. This should only be needed if the current 'nifi' module is in
the process of updating to a new version of either the 'nifi-parent' or 'nifi-nar-maven-plugin'
artifacts.

View File

@ -18,11 +18,11 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-parent</artifactId>
<version>1.0.0-incubating-SNAPSHOT</version>
<version>1.0.0-incubating</version>
<relativePath />
</parent>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.0.1-incubating-SNAPSHOT</version>
<version>1.0.2-incubating-SNAPSHOT</version>
<packaging>maven-plugin</packaging>
<description>Apache NiFi Nar Plugin. It is currently a part of the Apache Incubator.</description>
<build>

View File

@ -23,7 +23,7 @@
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-parent</artifactId>
<version>1.0.0-incubating-SNAPSHOT</version>
<version>1.0.1-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<description>The nifi-parent enables each apache nifi project to ensure consistent approaches and DRY</description>
<url>http://nifi.incubator.apache.org</url>

View File

@ -13,7 +13,7 @@ module.exports = function (grunt) {
options: {
force: true
},
js: ['dist/js/'],
js: ['dist/js/*'],
css: ['dist/css/'],
assets: ['dist/assets/*'],
generated: ['dist/docs'],
@ -72,28 +72,28 @@ module.exports = function (grunt) {
}
},
copy: {
generated: {
files: [{
expand: true,
cwd: '../nifi/nifi-docs/target/generated-docs',
src: ['*.html', 'images/*'],
dest: 'dist/docs/'
}, {
expand: true,
cwd: '../nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api',
src: ['target/nifi-web-api-*/docs/rest-api/index.html', 'target/nifi-web-api-*/docs/rest-api/images/*'],
dest: 'dist/docs/',
rename: function (dest, src) {
var path = require('path');
if (src.indexOf('images') > 0) {
return path.join(dest, 'rest-api/images', path.basename(src));
} else {
return path.join(dest, 'rest-api', path.basename(src));
}
}
}]
},
// generated: {
// files: [{
// expand: true,
// cwd: '../nifi/nifi-docs/target/generated-docs',
// src: ['*.html', 'images/*'],
// dest: 'dist/docs/'
// }, {
// expand: true,
// cwd: '../nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api',
// src: ['target/nifi-web-api-*/docs/rest-api/index.html', 'target/nifi-web-api-*/docs/rest-api/images/*'],
// dest: 'dist/docs/',
// rename: function (dest, src) {
// var path = require('path');
//
// if (src.indexOf('images') > 0) {
// return path.join(dest, 'rest-api/images', path.basename(src));
// } else {
// return path.join(dest, 'rest-api', path.basename(src));
// }
// }
// }]
// },
dist: {
files: [{
expand: true,
@ -163,6 +163,7 @@ module.exports = function (grunt) {
message: 'SVN password (if different from configured):'
}],
then: function () {
grunt.task.run('exec:add');
grunt.task.run('exec:commit');
}
}
@ -206,6 +207,12 @@ module.exports = function (grunt) {
stdout: true,
stderr: true
},
add: {
cwd: 'dist',
command: 'svn add --force .',
stdout: true,
stderr: true
},
commit: {
cwd: 'dist',
command: function() {
@ -254,6 +261,9 @@ module.exports = function (grunt) {
replacements: [{
from: /<div class="sub-title">.*<\/div>/g,
to: '<div class="sub-title">NiFi Rest Api</div>'
}, {
from: /<title>.*<\/title>/g,
to: '<title>NiFi Rest Api</title>'
}]
}
},
@ -295,9 +305,10 @@ module.exports = function (grunt) {
grunt.registerTask('img', ['newer:copy']);
grunt.registerTask('css', ['clean:css', 'compass']);
grunt.registerTask('js', ['clean:js', 'concat']);
grunt.registerTask('generate-docs', ['clean:generated', 'exec:generateDocs', 'exec:generateRestApiDocs', 'copy:generated', 'replace:addGoogleAnalytics', 'replace:moveTearDrop', 'replace:removeVersion']);
// grunt.registerTask('generate-docs', ['clean:generated', 'exec:generateDocs', 'exec:generateRestApiDocs', 'copy:generated', 'replace:addGoogleAnalytics', 'replace:moveTearDrop', 'replace:removeVersion']);
grunt.registerTask('build', ['assemble', 'css', 'js', 'img', 'generate-docs', 'copy:dist']);
grunt.registerTask('build', ['assemble', 'css', 'js', 'img', 'copy:dist']);
// grunt.registerTask('build', ['assemble', 'css', 'js', 'img', 'generate-docs', 'copy:dist']);
grunt.registerTask('deploy', ['clean:all', 'prompt:username', 'exec:checkout', 'build', 'exec:status', 'prompt:commit']);
grunt.registerTask('dev', ['default', 'watch']);

Binary file not shown.

After

Width:  |  Height:  |  Size: 141 KiB

View File

@ -28,11 +28,8 @@
<ul class="dropdown">
<li><a href="faq.html">FAQ</a></li>
<li><a href="screencasts.html">Screencasts</a></li>
<li><a href="overview.html">NiFi Overview</a></li>
<li><a href="user-guide.html">User Guide</a></li>
<li><a href="developer-guide.html">Developer Guide</a></li>
<li><a href="administrator-guide.html">Admin Guide</a></li>
<li><a href="rest-api.html">Rest Api</a></li>
<li><a href="docs.html">NiFi Docs</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/NIFI"><i class="fa fa-external-link external-link"></i>Wiki</a></li>
</ul>
</li>
<li class="has-dropdown">

View File

@ -1,7 +0,0 @@
---
title: Apache NiFi Administrator Guide
---
<div class="external-guide">
<iframe src="docs/administration-guide.html"></iframe>
</div>

View File

@ -3,5 +3,5 @@ title: Apache NiFi Developer Guide
---
<div class="external-guide">
<iframe src="docs/developer-guide.html"></iframe>
<iframe src="docs/nifi-docs/html/developer-guide.html"></iframe>
</div>

View File

@ -0,0 +1,7 @@
---
title: Apache NiFi Documentation
---
<div class="external-guide">
<iframe src="docs/nifi-docs/index.html"></iframe>
</div>

View File

@ -23,6 +23,25 @@ title: Apache NiFi Downloads
<div class="large-12 columns">
<h2>Releases</h2>
<ul>
<li>0.1.0-incubating
<ul>
<li><a href="https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12329276">Release Notes</a></li>
<li>
Sources:
<ul>
<li><a href="https://www.apache.org/dyn/closer.cgi?path=/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-source-release.zip">nifi-0.1.0-incubating-source-release.zip</a> ( <a href="https://dist.apache.org/repos/dist/release/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-source-release.zip.asc">asc</a>, <a href="https://dist.apache.org/repos/dist/release/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-source-release.zip.md5">md5</a>, <a href="https://dist.apache.org/repos/dist/release/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-source-release.zip.sha1">sha1</a> )</li>
</ul>
</li>
<li>
Binaries
<ul>
<li><a href="https://www.apache.org/dyn/closer.cgi?path=/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-bin.tar.gz">nifi-0.1.0-incubating-bin.tar.gz</a> ( <a href="https://dist.apache.org/repos/dist/release/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-bin.tar.gz.asc">asc</a>, <a href="https://dist.apache.org/repos/dist/release/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-bin.tar.gz.md5">md5</a>, <a href="https://dist.apache.org/repos/dist/release/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-bin.tar.gz.sha1">sha1</a> )</li>
<li><a href="https://www.apache.org/dyn/closer.cgi?path=/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-bin.zip">nifi-0.1.0-incubating-bin.zip</a> ( <a href="https://dist.apache.org/repos/dist/release/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-bin.zip.asc">asc</a>, <a href="https://dist.apache.org/repos/dist/release/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-bin.zip.md5">md5</a>, <a href="https://dist.apache.org/repos/dist/release/incubator/nifi/0.1.0-incubating/nifi-0.1.0-incubating-bin.zip.sha1">sha1</a> )</li>
</ul>
</li>
</ul>
</li>
<li>0.0.2-incubating
<ul>
<li><a href="https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12329373">Release Notes</a></li>

View File

@ -22,9 +22,9 @@ title: Apache NiFi FAQs
<li>
<p class="description"><b>Where can I find documentation on how to understand and configure NiFi?</b></p>
<ul>
<li>Our draft/preview of the User Guide is now available under the docs dropdown.</li>
<li>We're working on completing that and having it fully integrated to the application.</li>
<li>The developer guide is also in the works.</li>
<li>Documentation is available under the <a href="docs.html">NiFi Docs</a> link within the Documentation dropdown.</li>
<li>A <a href="developer-guide.html">Developer Guide</a> is also available under the Development dropdown.</li>
<li>View the <a href="https://cwiki.apache.org/confluence/display/NIFI">Apache NiFi Wiki</a> for additional information related to the project as well as how to contribute.</li>
<li>For more information on presently outstanding documentation work <a href="https://issues.apache.org/jira/browse/NIFI-162?jql=project%20%3D%20NIFI%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20%22Documentation%20%26%20Website%22%20ORDER%20BY%20priority%20DESC">please click here</a>.</li>
</ul>
</li>

View File

@ -23,7 +23,9 @@ title: Apache NiFi
</div>
</div>
<div class="medium-6 large-7 columns">
<img id="flow" src="images/flow.png" alt="NiFi flow"/>
<a class="th" href="images/flow.png">
<img id="flow" src="images/flow-th.png" alt="NiFi flow">
</a>
</div>
<div class="clear"></div>
</div>

View File

@ -1,7 +0,0 @@
---
title: Apache NiFi Overview
---
<div class="external-guide">
<iframe src="docs/overview.html"></iframe>
</div>

View File

@ -115,6 +115,16 @@ title: Apache NiFi Team
<td>wikier</td>
<td>Sergio Fernandez </td>
<td>Mentor</td>
</tr>
<tr>
<td>danbress</td>
<td>Dan Bress</td>
<td></td>
</tr>
<tr>
<td>bbende</td>
<td>Bryan Bende</td>
<td></td>
</tr>
</table>
</div>

View File

@ -1,7 +0,0 @@
---
title: Apache NiFi Rest Api
---
<div class="external-guide">
<iframe src="docs/rest-api/index.html"></iframe>
</div>

View File

@ -1,7 +0,0 @@
---
title: Apache NiFi User Guide
---
<div class="external-guide">
<iframe src="docs/user-guide.html"></iframe>
</div>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-api</artifactId>
<packaging>jar</packaging>

View File

@ -100,6 +100,15 @@ public interface FlowFileQueue {
QueueSize getActiveQueueSize();
/**
* Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
* is considered to be unacknowledged if it has been pulled from the queue by some component
* but the session that pulled the FlowFile has not yet been committed or rolled back.
*
* @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
*/
QueueSize getUnacknowledgedQueueSize();
void acknowledge(FlowFileRecord flowFile);
void acknowledge(Collection<FlowFileRecord> flowFiles);

View File

@ -675,7 +675,7 @@ This product bundles HexViewJS available under an MIT License
The binary distribution of this product bundles 'JCraft Jsch' which is available
under a BSD style license.
Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc.
Copyright (c) 2002-2015 Atsuhiko Yamanaka, JCraft,Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@ -851,3 +851,65 @@ For details see http://asm.ow2.org/asmdex-license.html
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles 'Hamcrest' which is available
under a BSD license. More details found here: http://hamcrest.org.
Copyright (c) 2000-2006, www.hamcrest.org
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer. Redistributions in binary form must reproduce
the above copyright notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
Neither the name of Hamcrest nor the names of its contributors may be used to endorse
or promote products derived from this software without specific prior written
permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
The binary distribution of this product bundles 'leveldbjni-all-1.8.jar' which is available
under a BSD style license
Copyright (c) 2011 FuseSource Corp. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of FuseSource Corp. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles 'Woodstox StAX 2 API' which is
"licensed under standard BSD license"

View File

@ -64,10 +64,10 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Apache HttpComponents
The following NOTICE information applies:
Apache HttpClient
Copyright 1999-2014 The Apache Software Foundation
Copyright 1999-2015 The Apache Software Foundation
Apache HttpCore
Copyright 2005-2014 The Apache Software Foundation
Copyright 2005-2015 The Apache Software Foundation
Apache HttpMime
Copyright 1999-2013 The Apache Software Foundation
@ -83,16 +83,21 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Apache Commons Logging
The following NOTICE information applies:
Apache Commons Logging
Copyright 2003-2013 The Apache Software Foundation
Copyright 2003-2014 The Apache Software Foundation
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Commons Configuration
The following NOTICE information applies:
Apache Commons Configuration
Copyright 2001-2008 The Apache Software Foundation
(ASLv2) Apache Commons JEXL
The following NOTICE information applies:
Apache Commons JEXL
@ -101,7 +106,7 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Spring Framework
The following NOTICE information applies:
Spring Framework 4.1.4.RELEASE
Copyright (c) 2002-2014 Pivotal, Inc.
Copyright (c) 2002-2015 Pivotal, Inc.
(ASLv2) Apache Commons CLI
The following NOTICE information applies:
@ -189,11 +194,6 @@ The following binary components are provided under the Apache Software License v
Apache Tika Core
Copyright 2007-2015 The Apache Software Foundation
(ASLv2) Apache Commons Configuration
The following NOTICE information applies:
Apache Commons Configuration
Copyright 2001-2008 The Apache Software Foundation
(ASLv2) Apache Jakarta Commons Digester
The following NOTICE information applies:
Apache Jakarta Commons Digester
@ -460,7 +460,7 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Apache ActiveMQ
The following NOTICE information applies:
ActiveMQ :: Client
Copyright 2005-2014 The Apache Software Foundation
Copyright 2005-2015 The Apache Software Foundation
(ASLv2) Apache Geronimo
The following NOTICE information applies:
@ -491,6 +491,10 @@ The following binary components are provided under the Apache Software License v
This product includes software developed by
Saxonica (http://www.saxonica.com/).
(ASLv2) MongoDB Java Driver
The following NOTICE information applies:
Copyright (C) 2008-2013 10gen, Inc.
(ASLv2) Parquet MR
The following NOTICE information applies:
Parquet MR
@ -545,19 +549,88 @@ The following binary components are provided under the Apache Software License v
- JSON parsing and utility functions from JSON.org - Copyright 2002 JSON.org.
- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
(ASLv2) Apache Commons DBCP
The following NOTICE information applies:
Apache Commons DBCP
Copyright 2001-2015 The Apache Software Foundation.
(ASLv2) Apache Commons Pool
The following NOTICE information applies:
Apache Commons Pool
Copyright 1999-2009 The Apache Software Foundation.
(ASLv2) Apache Derby
The following NOTICE information applies:
Apache Derby
Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils,
the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation.
(ASLv2) Apache Directory Server
The following NOTICE information applies:
ApacheDS Protocol Kerberos Codec
Copyright 2003-2013 The Apache Software Foundation
ApacheDS I18n
Copyright 2003-2013 The Apache Software Foundation
Apache Directory API ASN.1 API
Copyright 2003-2013 The Apache Software Foundation
Apache Directory LDAP API Utilities
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache Curator
The following NOTICE information applies:
Curator Framework
Copyright 2011-2014 The Apache Software Foundation
Curator Client
Copyright 2011-2014 The Apache Software Foundation
Curator Recipes
Copyright 2011-2014 The Apache Software Foundation
(ASLv2) The Netty Project
The following NOTICE information applies:
The Netty Project
Copyright 2011 The Netty Project
(ASLv2) Apache Xerces Java
The following NOTICE information applies:
Apache Xerces Java
Copyright 1999-2007 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Portions of this software were originally based on the following:
- software copyright (c) 1999, IBM Corporation., http://www.ibm.com.
- software copyright (c) 1999, Sun Microsystems., http://www.sun.com.
- voluntary contributions made by Paul Eng on behalf of the
Apache Software Foundation that were originally developed at iClick, Inc.,
software copyright (c) 1999.
(ASLv2) Google Guice
The following NOTICE information applies:
Google Guice - Core Library
Copyright 2006-2011 Google, Inc.
Google Guice - Extensions - Servlet
Copyright 2006-2011 Google, Inc.
************************
Common Development and Distribution License 1.1
************************
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.18.3 - https://jersey.java.net/jersey-client/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.18.3 - https://jersey.java.net/jersey-core/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-spring (com.sun.jersey:jersey-spring:jar:1.18.3 - https://jersey.java.net/jersey-spring/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-servlet (com.sun.jersey:jersey-servlet:jar:1.18.3 - https://jersey.java.net/jersey-servlet/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-multipart (com.sun.jersey:jersey-multipart:jar:1.18.3 - https://jersey.java.net/jersey-multipart/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.18.3 - https://jersey.java.net/jersey-server/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.18.3 - https://jersey.java.net/jersey-json/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net/jersey-client/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/jersey-core/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-spring (com.sun.jersey:jersey-spring:jar:1.19 - https://jersey.java.net/jersey-spring/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-servlet (com.sun.jersey:jersey-servlet:jar:1.19 - https://jersey.java.net/jersey-servlet/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-multipart (com.sun.jersey:jersey-multipart:jar:1.19 - https://jersey.java.net/jersey-multipart/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.19 - https://jersey.java.net/jersey-server/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/jersey-json/)
(CDDL 1.1) (GPL2 w/ CPE) Old JAXB Runtime (com.sun.xml.bind:jaxb-impl:jar:2.2.3-1 - http://jaxb.java.net/)
(CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.2 - https://jaxb.dev.java.net/)
(CDDL 1.1) (GPL2 w/ CPE) MIME Streaming Extension (org.jvnet.mimepull:mimepull:jar:1.9.3 - http://mimepull.java.net)
@ -569,7 +642,7 @@ The following binary components are provided under the Common Development and Di
(CDDL 1.1) (GPL2 w/ CPE) Expression Language 3.0 API (javax.el:javax.el-api:jar:3.0.0 - http://uel-spec.java.net)
(CDDL 1.1) (GPL2 w/ CPE) JavaServer Pages(TM) Standard Tag Library API (javax.servlet.jsp.jstl:javax.servlet.jsp.jstl-api:jar:1.2.1 - http://jcp.org/en/jsr/detail?id=52)
(CDDL 1.1) (GPL2 w/ CPE) Java Servlet API (javax.servlet:javax.servlet-api:jar:3.1.0 - http://servlet-spec.java.net)
(CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0 - http://java.net/projects/jms-spec/pages/Home)
(CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 - http://java.net/projects/jms-spec/pages/Home)
************************
Common Development and Distribution License 1.0
@ -581,6 +654,7 @@ The following binary components are provided under the Common Development and Di
(CDDL 1.0) (GPL3) Streaming API For XML (javax.xml.stream:stax-api:jar:1.0-2 - no url provided)
(CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp)
(CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net)
(CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net)
************************
Creative Commons Attribution-ShareAlike 3.0
@ -596,10 +670,10 @@ Eclipse Public License 1.0
The following binary components are provided under the Eclipse Public License 1.0. See project link for details.
(EPL 1.0) AspectJ Weaver (org.aspectj:aspectjweaver:jar:1.8.4 - http://www.aspectj.org)
(EPL 1.0) AspectJ Weaver (org.aspectj:aspectjweaver:jar:1.8.5 - http://www.aspectj.org)
(EPL 1.0)(MPL 2.0) H2 Database (com.h2database:h2:jar:1.3.176 - http://www.h2database.com/html/license.html)
(EPL 1.0)(LGPL 2.1) Logback Classic (ch.qos.logback:logback-classic:jar:1.1.2 - http://logback.qos.ch/)
(EPL 1.0)(LGPL 2.1) Logback Core (ch.qos.logback:logback-core:jar:1.1.2 - http://logback.qos.ch/)
(EPL 1.0)(LGPL 2.1) Logback Classic (ch.qos.logback:logback-classic:jar:1.1.3 - http://logback.qos.ch/)
(EPL 1.0)(LGPL 2.1) Logback Core (ch.qos.logback:logback-core:jar:1.1.3 - http://logback.qos.ch/)
*****************
Mozilla Public License v2.0
@ -607,7 +681,7 @@ Mozilla Public License v2.0
The following binary components are provided under the Mozilla Public License v2.0. See project link for details.
(MPL 2.0) Saxon HE (net.sf.saxon:Saxon-HE:jar:9.6.0-4 - http://www.saxonica.com/)
(MPL 2.0) Saxon HE (net.sf.saxon:Saxon-HE:jar:9.6.0-5 - http://www.saxonica.com/)
*****************
Mozilla Public License v1.1
@ -624,5 +698,5 @@ Public Domain
The following binary components are provided to the 'Public Domain'. See project link for details.
(Public Domain) XZ for Java (org.tukaani:xz:jar:1.2 - http://tukaani.org/xz/java.html
(Public Domain) XZ for Java (org.tukaani:xz:jar:1.5 - http://tukaani.org/xz/java.html
(Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)

View File

@ -9,13 +9,12 @@ by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-assembly</artifactId>
<packaging>pom</packaging>
@ -163,6 +162,17 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-kite-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-nar</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>
@ -171,25 +181,25 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-social-media-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hl7-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-language-translation-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geo-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
@ -233,11 +243,11 @@ language governing permissions and limitations under the License. -->
<nifi.content.claim.max.appendable.size>10 MB</nifi.content.claim.max.appendable.size>
<nifi.content.claim.max.flow.files>100</nifi.content.claim.max.flow.files>
<nifi.content.repository.directory.default>./content_repository</nifi.content.repository.directory.default>
<nifi.content.repository.archive.max.retention.period />
<nifi.content.repository.archive.max.usage.percentage />
<nifi.content.repository.archive.enabled>false</nifi.content.repository.archive.enabled>
<nifi.content.repository.archive.max.retention.period>12 hours</nifi.content.repository.archive.max.retention.period>
<nifi.content.repository.archive.max.usage.percentage>50%</nifi.content.repository.archive.max.usage.percentage>
<nifi.content.repository.archive.enabled>true</nifi.content.repository.archive.enabled>
<nifi.content.repository.always.sync>false</nifi.content.repository.always.sync>
<nifi.content.viewer.url />
<nifi.content.viewer.url>/nifi-content-viewer/</nifi.content.viewer.url>
<nifi.restore.directory />
<nifi.ui.banner.text />
@ -257,11 +267,11 @@ language governing permissions and limitations under the License. -->
<nifi.provenance.repository.directory.default>./provenance_repository</nifi.provenance.repository.directory.default>
<nifi.provenance.repository.max.storage.time>24 hours</nifi.provenance.repository.max.storage.time>
<nifi.provenance.repository.max.storage.size>1 GB</nifi.provenance.repository.max.storage.size>
<nifi.provenance.repository.rollover.time>5 mins</nifi.provenance.repository.rollover.time>
<nifi.provenance.repository.rollover.time>30 secs</nifi.provenance.repository.rollover.time>
<nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size>
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
<nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID</nifi.provenance.repository.indexed.fields>
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields>
<nifi.provenance.repository.indexed.attributes />
<nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size>
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
@ -272,8 +282,8 @@ language governing permissions and limitations under the License. -->
<!-- Component status repository properties -->
<nifi.components.status.repository.implementation>org.apache.nifi.controller.status.history.VolatileComponentStatusRepository</nifi.components.status.repository.implementation>
<nifi.components.status.repository.buffer.size>288</nifi.components.status.repository.buffer.size>
<nifi.components.status.snapshot.frequency>5 mins</nifi.components.status.snapshot.frequency>
<nifi.components.status.repository.buffer.size>1440</nifi.components.status.repository.buffer.size>
<nifi.components.status.snapshot.frequency>1 min</nifi.components.status.snapshot.frequency>
<!-- nifi.properties: web properties -->
<nifi.web.war.directory>./lib</nifi.web.war.directory>

View File

@ -49,6 +49,8 @@
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>nifi-bootstrap</include>
<include>slf4j-api</include>
<include>logback-classic</include>
</includes>
</dependencySet>

View File

@ -17,8 +17,15 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-bootstrap</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -23,6 +23,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.bootstrap.util.LimitingInputStream;
@ -40,6 +41,7 @@ public class NiFiListener {
listener = new Listener(serverSocket, runner);
final Thread listenThread = new Thread(listener);
listenThread.setName("Listen to NiFi");
listenThread.setDaemon(true);
listenThread.start();
return localPort;
}
@ -62,7 +64,16 @@ public class NiFiListener {
public Listener(final ServerSocket serverSocket, final RunNiFi runner) {
this.serverSocket = serverSocket;
this.executor = Executors.newFixedThreadPool(2);
this.executor = Executors.newFixedThreadPool(2, new ThreadFactory() {
@Override
public Thread newThread(final Runnable runnable) {
final Thread t = Executors.defaultThreadFactory().newThread(runnable);
t.setDaemon(true);
t.setName("NiFi Bootstrap Command Listener");
return t;
}
});
this.runner = runner;
}

View File

@ -41,13 +41,17 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
@ -94,19 +98,27 @@ public class RunNiFi {
private final File bootstrapConfigFile;
private final java.util.logging.Logger logger;
// used for logging initial info; these will be logged to console by default when the app is started
private final Logger cmdLogger = LoggerFactory.getLogger("org.apache.nifi.bootstrap.Command");
// used for logging all info. These by default will be written to the log file
private final Logger defaultLogger = LoggerFactory.getLogger(RunNiFi.class);
private final ExecutorService loggingExecutor;
private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
public RunNiFi(final File bootstrapConfigFile, final boolean verbose) {
this.bootstrapConfigFile = bootstrapConfigFile;
logger = java.util.logging.Logger.getLogger("Bootstrap");
if (verbose) {
logger.info("Enabling Verbose Output");
logger.setLevel(Level.FINE);
final Handler handler = new ConsoleHandler();
handler.setLevel(Level.FINE);
logger.addHandler(handler);
loggingExecutor = Executors.newFixedThreadPool(2, new ThreadFactory() {
@Override
public Thread newThread(final Runnable runnable) {
final Thread t = Executors.defaultThreadFactory().newThread(runnable);
t.setDaemon(true);
t.setName("NiFi logging handler");
return t;
}
});
}
private static void printUsage() {
@ -185,10 +197,10 @@ public class RunNiFi {
switch (cmd.toLowerCase()) {
case "start":
runNiFi.start(false);
runNiFi.start();
break;
case "run":
runNiFi.start(true);
runNiFi.start();
break;
case "stop":
runNiFi.stop();
@ -198,7 +210,7 @@ public class RunNiFi {
break;
case "restart":
runNiFi.stop();
runNiFi.start(false);
runNiFi.start();
break;
case "dump":
runNiFi.dump(dumpFile);
@ -206,37 +218,44 @@ public class RunNiFi {
}
}
public File getStatusFile() {
File getStatusFile() {
return getStatusFile(defaultLogger);
}
public File getStatusFile(final Logger logger) {
final File confDir = bootstrapConfigFile.getParentFile();
final File nifiHome = confDir.getParentFile();
final File bin = new File(nifiHome, "bin");
final File statusFile = new File(bin, "nifi.pid");
logger.log(Level.FINE, "Status File: {0}", statusFile);
logger.debug("Status File: {}", statusFile);
return statusFile;
}
private Properties loadProperties() throws IOException {
private Properties loadProperties(final Logger logger) throws IOException {
final Properties props = new Properties();
final File statusFile = getStatusFile();
final File statusFile = getStatusFile(logger);
if (statusFile == null || !statusFile.exists()) {
logger.fine("No status file to load properties from");
logger.debug("No status file to load properties from");
return props;
}
try (final FileInputStream fis = new FileInputStream(getStatusFile())) {
try (final FileInputStream fis = new FileInputStream(getStatusFile(logger))) {
props.load(fis);
}
logger.log(Level.FINE, "Properties: {0}", props);
final Map<Object, Object> modified = new HashMap<>(props);
modified.remove("secret.key");
logger.debug("Properties: {}", modified);
return props;
}
private synchronized void saveProperties(final Properties nifiProps) throws IOException {
final File statusFile = getStatusFile();
private synchronized void saveProperties(final Properties nifiProps, final Logger logger) throws IOException {
final File statusFile = getStatusFile(logger);
if (statusFile.exists() && !statusFile.delete()) {
logger.log(Level.WARNING, "Failed to delete {0}", statusFile);
logger.warn("Failed to delete {}", statusFile);
}
if (!statusFile.createNewFile()) {
@ -249,7 +268,7 @@ public class RunNiFi {
perms.add(PosixFilePermission.OWNER_WRITE);
Files.setPosixFilePermissions(statusFile.toPath(), perms);
} catch (final Exception e) {
logger.log(Level.WARNING, "Failed to set permissions so that only the owner can read status file {0}; "
logger.warn("Failed to set permissions so that only the owner can read status file {}; "
+ "this may allows others to have access to the key needed to communicate with NiFi. "
+ "Permissions should be changed so that only the owner can read this file", statusFile);
}
@ -259,23 +278,23 @@ public class RunNiFi {
fos.getFD().sync();
}
logger.log(Level.FINE, "Saved Properties {0} to {1}", new Object[]{nifiProps, statusFile});
logger.debug("Saved Properties {} to {}", new Object[]{nifiProps, statusFile});
}
private boolean isPingSuccessful(final int port, final String secretKey) {
logger.log(Level.FINE, "Pinging {0}", port);
private boolean isPingSuccessful(final int port, final String secretKey, final Logger logger) {
logger.debug("Pinging {}", port);
try (final Socket socket = new Socket("localhost", port)) {
final OutputStream out = socket.getOutputStream();
out.write((PING_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
logger.fine("Sent PING command");
logger.debug("Sent PING command");
socket.setSoTimeout(5000);
final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final String response = reader.readLine();
logger.log(Level.FINE, "PING response: {0}", response);
logger.debug("PING response: {}", response);
out.close();
reader.close();
@ -285,27 +304,27 @@ public class RunNiFi {
}
}
private Integer getCurrentPort() throws IOException {
final Properties props = loadProperties();
private Integer getCurrentPort(final Logger logger) throws IOException {
final Properties props = loadProperties(logger);
final String portVal = props.getProperty("port");
if (portVal == null) {
logger.fine("No Port found in status file");
logger.debug("No Port found in status file");
return null;
} else {
logger.log(Level.FINE, "Port defined in status file: {0}", portVal);
logger.debug("Port defined in status file: {}", portVal);
}
final int port = Integer.parseInt(portVal);
final boolean success = isPingSuccessful(port, props.getProperty("secret.key"));
final boolean success = isPingSuccessful(port, props.getProperty("secret.key"), logger);
if (success) {
logger.log(Level.FINE, "Successful PING on port {0}", port);
logger.debug("Successful PING on port {}", port);
return port;
}
final String pid = props.getProperty("pid");
logger.log(Level.FINE, "PID in status file is {0}", pid);
logger.debug("PID in status file is {}", pid);
if (pid != null) {
final boolean procRunning = isProcessRunning(pid);
final boolean procRunning = isProcessRunning(pid, logger);
if (procRunning) {
return port;
} else {
@ -316,7 +335,7 @@ public class RunNiFi {
return null;
}
private boolean isProcessRunning(final String pid) {
private boolean isProcessRunning(final String pid, final Logger logger) {
try {
// We use the "ps" command to check if the process is still running.
final ProcessBuilder builder = new ProcessBuilder();
@ -340,9 +359,9 @@ public class RunNiFi {
// If output of the ps command had our PID, the process is running.
if (running) {
logger.log(Level.FINE, "Process with PID {0} is running", pid);
logger.debug("Process with PID {} is running", pid);
} else {
logger.log(Level.FINE, "Process with PID {0} is not running", pid);
logger.debug("Process with PID {} is not running", pid);
}
return running;
@ -352,10 +371,10 @@ public class RunNiFi {
}
}
private Status getStatus() {
private Status getStatus(final Logger logger) {
final Properties props;
try {
props = loadProperties();
props = loadProperties(logger);
} catch (final IOException ioe) {
return new Status(null, null, false, false);
}
@ -377,7 +396,7 @@ public class RunNiFi {
if (portValue != null) {
try {
port = Integer.parseInt(portValue);
pingSuccess = isPingSuccessful(port, secretKey);
pingSuccess = isPingSuccessful(port, secretKey, logger);
} catch (final NumberFormatException nfe) {
return new Status(null, null, false, false);
}
@ -387,20 +406,21 @@ public class RunNiFi {
return new Status(port, pid, true, true);
}
final boolean alive = (pid == null) ? false : isProcessRunning(pid);
final boolean alive = (pid == null) ? false : isProcessRunning(pid, logger);
return new Status(port, pid, pingSuccess, alive);
}
public void status() throws IOException {
final Status status = getStatus();
final Logger logger = cmdLogger;
final Status status = getStatus(logger);
if (status.isRespondingToPing()) {
logger.log(Level.INFO, "Apache NiFi is currently running, listening to Bootstrap on port {0}, PID={1}",
logger.info("Apache NiFi is currently running, listening to Bootstrap on port {}, PID={}",
new Object[]{status.getPort(), status.getPid() == null ? "unknkown" : status.getPid()});
return;
}
if (status.isProcessRunning()) {
logger.log(Level.INFO, "Apache NiFi is running at PID {0} but is not responding to ping requests", status.getPid());
logger.info("Apache NiFi is running at PID {} but is not responding to ping requests", status.getPid());
return;
}
@ -424,36 +444,36 @@ public class RunNiFi {
* @throws IOException if any issues occur while writing the dump file
*/
public void dump(final File dumpFile) throws IOException {
final Integer port = getCurrentPort();
final Logger logger = defaultLogger; // dump to bootstrap log file by default
final Integer port = getCurrentPort(logger);
if (port == null) {
System.out.println("Apache NiFi is not currently running");
logger.info("Apache NiFi is not currently running");
return;
}
final Properties nifiProps = loadProperties();
final Properties nifiProps = loadProperties(logger);
final String secretKey = nifiProps.getProperty("secret.key");
final StringBuilder sb = new StringBuilder();
try (final Socket socket = new Socket()) {
logger.fine("Connecting to NiFi instance");
logger.debug("Connecting to NiFi instance");
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", port));
logger.fine("Established connection to NiFi instance.");
logger.debug("Established connection to NiFi instance.");
socket.setSoTimeout(60000);
logger.log(Level.FINE, "Sending DUMP Command to port {0}", port);
logger.debug("Sending DUMP Command to port {}", port);
final OutputStream out = socket.getOutputStream();
out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
out.close();
final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
sb.append(line).append("\n");
}
reader.close();
}
}
final String dump = sb.toString();
@ -463,39 +483,49 @@ public class RunNiFi {
try (final FileOutputStream fos = new FileOutputStream(dumpFile)) {
fos.write(dump.getBytes(StandardCharsets.UTF_8));
}
logger.log(Level.INFO, "Successfully wrote thread dump to {0}", dumpFile.getAbsolutePath());
// we want to log to the console (by default) that we wrote the thread dump to the specified file
cmdLogger.info("Successfully wrote thread dump to {}", dumpFile.getAbsolutePath());
}
}
public void stop() throws IOException {
final Integer port = getCurrentPort();
final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger);
if (port == null) {
System.out.println("Apache NiFi is not currently running");
logger.info("Apache NiFi is not currently running");
return;
}
final Properties nifiProps = loadProperties();
final Properties nifiProps = loadProperties(logger);
final String secretKey = nifiProps.getProperty("secret.key");
final File statusFile = getStatusFile(logger);
if (statusFile.exists() && !statusFile.delete()) {
logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
}
try (final Socket socket = new Socket()) {
logger.fine("Connecting to NiFi instance");
logger.debug("Connecting to NiFi instance");
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", port));
logger.fine("Established connection to NiFi instance.");
logger.debug("Established connection to NiFi instance.");
socket.setSoTimeout(60000);
logger.log(Level.FINE, "Sending SHUTDOWN Command to port {0}", port);
logger.debug("Sending SHUTDOWN Command to port {}", port);
final OutputStream out = socket.getOutputStream();
out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
out.close();
socket.shutdownOutput();
final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final String response = reader.readLine();
reader.close();
int lastChar;
final StringBuilder sb = new StringBuilder();
while ((lastChar = in.read()) > -1) {
sb.append((char) lastChar);
}
final String response = sb.toString().trim();
logger.log(Level.FINE, "Received response to SHUTDOWN command: {0}", response);
logger.debug("Received response to SHUTDOWN command: {}", response);
if (SHUTDOWN_CMD.equals(response)) {
logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now");
@ -516,17 +546,17 @@ public class RunNiFi {
}
final long startWait = System.nanoTime();
while (isProcessRunning(pid)) {
while (isProcessRunning(pid, logger)) {
logger.info("Waiting for Apache NiFi to finish shutting down...");
final long waitNanos = System.nanoTime() - startWait;
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) {
if (isProcessRunning(pid)) {
logger.log(Level.WARNING, "NiFi has not finished shutting down after {0} seconds. Killing process.", gracefulShutdownSeconds);
if (isProcessRunning(pid, logger)) {
logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds);
try {
killProcessTree(pid);
killProcessTree(pid, logger);
} catch (final IOException ioe) {
logger.log(Level.SEVERE, "Failed to kill Process with PID {0}", pid);
logger.error("Failed to kill Process with PID {}", pid);
}
}
break;
@ -540,16 +570,11 @@ public class RunNiFi {
logger.info("NiFi has finished shutting down.");
}
final File statusFile = getStatusFile();
if (!statusFile.delete()) {
logger.log(Level.SEVERE, "Failed to delete status file {0}; this file should be cleaned up manually", statusFile);
}
} else {
logger.log(Level.SEVERE, "When sending SHUTDOWN command to NiFi, got unexpected response {0}", response);
logger.error("When sending SHUTDOWN command to NiFi, got unexpected response {}", response);
}
} catch (final IOException ioe) {
logger.log(Level.SEVERE, "Failed to send shutdown command to port {0} due to {1}", new Object[]{port, ioe});
logger.error("Failed to send shutdown command to port {} due to {}", new Object[]{port, ioe.toString(), ioe});
}
}
@ -568,14 +593,14 @@ public class RunNiFi {
return childPids;
}
private void killProcessTree(final String pid) throws IOException {
logger.log(Level.FINE, "Killing Process Tree for PID {0}", pid);
private void killProcessTree(final String pid, final Logger logger) throws IOException {
logger.debug("Killing Process Tree for PID {}", pid);
final List<String> children = getChildProcesses(pid);
logger.log(Level.FINE, "Children of PID {0}: {1}", new Object[]{pid, children});
logger.debug("Children of PID {}: {}", new Object[]{pid, children});
for (final String childPid : children) {
killProcessTree(childPid);
killProcessTree(childPid, logger);
}
Runtime.getRuntime().exec(new String[]{"kill", "-9", pid});
@ -591,10 +616,10 @@ public class RunNiFi {
}
@SuppressWarnings({"rawtypes", "unchecked"})
public void start(final boolean monitor) throws IOException, InterruptedException {
final Integer port = getCurrentPort();
public void start() throws IOException, InterruptedException {
final Integer port = getCurrentPort(cmdLogger);
if (port != null) {
System.out.println("Apache NiFi is already running, listening to Bootstrap on port " + port);
cmdLogger.info("Apache NiFi is already running, listening to Bootstrap on port " + port);
return;
}
@ -733,11 +758,10 @@ public class RunNiFi {
cmdBuilder.append(s).append(" ");
}
logger.info("Starting Apache NiFi...");
logger.log(Level.INFO, "Working Directory: {0}", workingDir.getAbsolutePath());
logger.log(Level.INFO, "Command: {0}", cmdBuilder.toString());
cmdLogger.info("Starting Apache NiFi...");
cmdLogger.info("Working Directory: {}", workingDir.getAbsolutePath());
cmdLogger.info("Command: {}", cmdBuilder.toString());
if (monitor) {
String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP);
if (gracefulShutdown == null) {
gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE;
@ -757,15 +781,16 @@ public class RunNiFi {
}
Process process = builder.start();
Long pid = getPid(process);
handleLogging(process);
Long pid = getPid(process, cmdLogger);
if (pid != null) {
nifiPid = pid;
final Properties nifiProps = new Properties();
nifiProps.setProperty("pid", String.valueOf(nifiPid));
saveProperties(nifiProps);
saveProperties(nifiProps, cmdLogger);
}
shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds);
shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
final Runtime runtime = Runtime.getRuntime();
runtime.addShutdownHook(shutdownHook);
@ -785,70 +810,102 @@ public class RunNiFi {
}
if (autoRestartNiFi) {
logger.warning("Apache NiFi appears to have died. Restarting...");
process = builder.start();
final File statusFile = getStatusFile(defaultLogger);
if (!statusFile.exists()) {
defaultLogger.debug("Status File no longer exists. Will not restart NiFi");
return;
}
pid = getPid(process);
defaultLogger.warn("Apache NiFi appears to have died. Restarting...");
process = builder.start();
handleLogging(process);
pid = getPid(process, defaultLogger);
if (pid != null) {
nifiPid = pid;
final Properties nifiProps = new Properties();
nifiProps.setProperty("pid", String.valueOf(nifiPid));
saveProperties(nifiProps);
saveProperties(nifiProps, defaultLogger);
}
shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds);
shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
runtime.addShutdownHook(shutdownHook);
final boolean started = waitForStart();
if (started) {
logger.log(Level.INFO, "Successfully started Apache NiFi{0}", (pid == null ? "" : " with PID " + pid));
defaultLogger.info("Successfully started Apache NiFi{}", (pid == null ? "" : " with PID " + pid));
} else {
logger.severe("Apache NiFi does not appear to have started");
defaultLogger.error("Apache NiFi does not appear to have started");
}
} else {
return;
}
}
}
} else {
final Process process = builder.start();
final Long pid = getPid(process);
if (pid != null) {
nifiPid = pid;
final Properties nifiProps = new Properties();
nifiProps.setProperty("pid", String.valueOf(nifiPid));
saveProperties(nifiProps);
}
boolean started = waitForStart();
if (started) {
logger.log(Level.INFO, "Successfully started Apache NiFi{0}", (pid == null ? "" : " with PID " + pid));
} else {
logger.severe("Apache NiFi does not appear to have started");
}
listener.stop();
private void handleLogging(final Process process) {
final Set<Future<?>> existingFutures = loggingFutures;
if (existingFutures != null) {
for (final Future<?> future : existingFutures) {
future.cancel(false);
}
}
private Long getPid(final Process process) {
final Future<?> stdOutFuture = loggingExecutor.submit(new Runnable() {
@Override
public void run() {
final Logger stdOutLogger = LoggerFactory.getLogger("org.apache.nifi.StdOut");
final InputStream in = process.getInputStream();
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
stdOutLogger.info(line);
}
} catch (IOException e) {
defaultLogger.error("Failed to read from NiFi's Standard Out stream", e);
}
}
});
final Future<?> stdErrFuture = loggingExecutor.submit(new Runnable() {
@Override
public void run() {
final Logger stdErrLogger = LoggerFactory.getLogger("org.apache.nifi.StdErr");
final InputStream in = process.getErrorStream();
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
stdErrLogger.error(line);
}
} catch (IOException e) {
defaultLogger.error("Failed to read from NiFi's Standard Error stream", e);
}
}
});
final Set<Future<?>> futures = new HashSet<>();
futures.add(stdOutFuture);
futures.add(stdErrFuture);
this.loggingFutures = futures;
}
private Long getPid(final Process process, final Logger logger) {
try {
final Class<?> procClass = process.getClass();
final Field pidField = procClass.getDeclaredField("pid");
pidField.setAccessible(true);
final Object pidObject = pidField.get(process);
logger.log(Level.FINE, "PID Object = {0}", pidObject);
logger.debug("PID Object = {}", pidObject);
if (pidObject instanceof Number) {
return ((Number) pidObject).longValue();
}
return null;
} catch (final IllegalAccessException | NoSuchFieldException nsfe) {
logger.log(Level.FINE, "Could not find PID for child process due to {0}", nsfe);
logger.debug("Could not find PID for child process due to {}", nsfe);
return null;
}
}
@ -907,7 +964,7 @@ public class RunNiFi {
shutdownHook.setSecretKey(secretKey);
}
final File statusFile = getStatusFile();
final File statusFile = getStatusFile(defaultLogger);
final Properties nifiProps = new Properties();
if (nifiPid != -1) {
@ -917,12 +974,12 @@ public class RunNiFi {
nifiProps.setProperty("secret.key", secretKey);
try {
saveProperties(nifiProps);
saveProperties(nifiProps, defaultLogger);
} catch (final IOException ioe) {
logger.log(Level.WARNING, "Apache NiFi has started but failed to persist NiFi Port information to {0} due to {1}", new Object[]{statusFile.getAbsolutePath(), ioe});
defaultLogger.warn("Apache NiFi has started but failed to persist NiFi Port information to {} due to {}", new Object[]{statusFile.getAbsolutePath(), ioe});
}
logger.log(Level.INFO, "Apache NiFi now running and listening for Bootstrap requests on port {0}", port);
defaultLogger.info("Apache NiFi now running and listening for Bootstrap requests on port {}", port);
}
int getNiFiCommandControlPort() {

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class ShutdownHook extends Thread {
@ -28,14 +29,16 @@ public class ShutdownHook extends Thread {
private final Process nifiProcess;
private final RunNiFi runner;
private final int gracefulShutdownSeconds;
private final ExecutorService executor;
private volatile String secretKey;
public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final String secretKey, final int gracefulShutdownSeconds) {
public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final String secretKey, final int gracefulShutdownSeconds, final ExecutorService executor) {
this.nifiProcess = nifiProcess;
this.runner = runner;
this.secretKey = secretKey;
this.gracefulShutdownSeconds = gracefulShutdownSeconds;
this.executor = executor;
}
void setSecretKey(final String secretKey) {
@ -44,6 +47,7 @@ public class ShutdownHook extends Thread {
@Override
public void run() {
executor.shutdown();
runner.setAutoRestartNiFi(false);
final int ccPort = runner.getNiFiCommandControlPort();
if (ccPort > 0) {

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-data-provenance-utils</artifactId>
<packaging>jar</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-expression-language</artifactId>
<packaging>jar</packaging>

View File

@ -264,7 +264,14 @@ public class Query {
}
}
// Keep track of the number of opening curly braces that we are embedded within,
// if we are within an Expression. If we are outside of an Expression, we can just ignore
// curly braces. This allows us to ignore the first character if the value is something
// like: { ${abc} }
// However, we will count the curly braces if we have something like: ${ $${abc} }
if (expressionStart > -1) {
embeddedCount++;
}
} else if (c == '}') {
if (embeddedCount <= 0) {
continue;
@ -965,7 +972,7 @@ public class Query {
case NUMBER:
return (NumberEvaluator) evaluator;
case STRING:
return new NumberCastEvaluator((StringEvaluator) evaluator);
return new NumberCastEvaluator(evaluator);
case DATE:
return new DateToNumberEvaluator((DateEvaluator) evaluator);
default:

View File

@ -361,6 +361,8 @@ public class TestQuery {
@Test
public void testExtractExpressionRanges() {
assertEquals(29, Query.extractExpressionRanges("${hello:equals( $${goodbye} )}").get(0).getEnd());
List<Range> ranges = Query.extractExpressionRanges("hello");
assertTrue(ranges.isEmpty());
@ -1098,6 +1100,18 @@ public class TestQuery {
verifyEquals(query, attributes, true);
}
@Test
public void testEvaluateWithinCurlyBraces() {
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
final String query = "{ ${abc} }";
final List<String> expressions = Query.extractExpressions(query);
assertEquals(1, expressions.size());
assertEquals("${abc}", expressions.get(0));
assertEquals("{ xyz }", Query.evaluateExpressions(query, attributes));
}
private void verifyEquals(final String expression, final Map<String, String> attributes, final Object expectedResult) {
Query.validateExpression(expression, false);
assertEquals(String.valueOf(expectedResult), Query.evaluateExpressions(expression, attributes, null));

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-flowfile-packager</artifactId>
<packaging>jar</packaging>

View File

@ -13,14 +13,13 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hl7-query-language</artifactId>
@ -49,7 +48,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<excludes combine.children="append">
<exclude>src/test/resources/hypoglycemia</exclude>
<exclude>src/test/resources/hyperglycemia</exclude>
</excludes>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-logging-utils</artifactId>
<description>Utilities for logging</description>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-processor-utils</artifactId>
<packaging>jar</packaging>

View File

@ -163,6 +163,8 @@ public class SSLProperties {
KEYSTORE, TRUSTSTORE
}
private static final String DEFAULT_SSL_PROTOCOL_ALGORITHM = "TLS";
public static List<PropertyDescriptor> getKeystoreDescriptors(final boolean required) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
for (final PropertyDescriptor descriptor : KEYSTORE_DESCRIPTORS) {
@ -196,14 +198,15 @@ public class SSLProperties {
return SslContextFactory.createTrustSslContext(
context.getProperty(TRUSTSTORE).getValue(),
context.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(),
context.getProperty(TRUSTSTORE_TYPE).getValue());
context.getProperty(TRUSTSTORE_TYPE).getValue(),
DEFAULT_SSL_PROTOCOL_ALGORITHM);
} else {
final String truststoreFile = context.getProperty(TRUSTSTORE).getValue();
if (truststoreFile == null) {
return SslContextFactory.createSslContext(
context.getProperty(KEYSTORE).getValue(),
context.getProperty(KEYSTORE_PASSWORD).getValue().toCharArray(),
context.getProperty(KEYSTORE_TYPE).getValue());
context.getProperty(KEYSTORE_TYPE).getValue(), DEFAULT_SSL_PROTOCOL_ALGORITHM);
} else {
return SslContextFactory.createSslContext(
context.getProperty(KEYSTORE).getValue(),
@ -212,7 +215,8 @@ public class SSLProperties {
context.getProperty(TRUSTSTORE).getValue(),
context.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(),
context.getProperty(TRUSTSTORE_TYPE).getValue(),
clientAuth);
clientAuth,
DEFAULT_SSL_PROTOCOL_ALGORITHM);
}
}
}

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-properties</artifactId>
</project>

View File

@ -23,6 +23,7 @@ import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
@ -84,9 +85,15 @@ public class NiFiPropertiesTest {
private NiFiProperties loadSpecifiedProperties(String propertiesFile) {
String file = NiFiPropertiesTest.class.getResource(propertiesFile).getFile();
String filePath;
try {
filePath = NiFiPropertiesTest.class.getResource(propertiesFile).toURI().getPath();
} catch (URISyntaxException ex) {
throw new RuntimeException("Cannot load properties file due to "
+ ex.getLocalizedMessage(), ex);
}
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, file);
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, filePath);
NiFiProperties properties = NiFiProperties.getInstance();
@ -97,7 +104,7 @@ public class NiFiPropertiesTest {
InputStream inStream = null;
try {
inStream = new BufferedInputStream(new FileInputStream(file));
inStream = new BufferedInputStream(new FileInputStream(filePath));
properties.load(inStream);
} catch (final Exception ex) {
throw new RuntimeException("Cannot load properties file due to "

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-security-utils</artifactId>
<description>Contains security functionality.</description>

View File

@ -57,6 +57,7 @@ public final class SslContextFactory {
* @param truststorePasswd the truststore password
* @param truststoreType the type of truststore (e.g., PKCS12, JKS)
* @param clientAuth the type of client authentication
* @param protocol the protocol to use for the SSL connection
*
* @return a SSLContext instance
* @throws java.security.KeyStoreException if any issues accessing the keystore
@ -69,7 +70,7 @@ public final class SslContextFactory {
public static SSLContext createSslContext(
final String keystore, final char[] keystorePasswd, final String keystoreType,
final String truststore, final char[] truststorePasswd, final String truststoreType,
final ClientAuth clientAuth)
final ClientAuth clientAuth, final String protocol)
throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
UnrecoverableKeyException, KeyManagementException {
@ -90,7 +91,7 @@ public final class SslContextFactory {
trustManagerFactory.init(trustStore);
// initialize the ssl context
final SSLContext sslContext = SSLContext.getInstance("TLS");
final SSLContext sslContext = SSLContext.getInstance(protocol);
sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
if (ClientAuth.REQUIRED == clientAuth) {
sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
@ -110,6 +111,7 @@ public final class SslContextFactory {
* @param keystore the full path to the keystore
* @param keystorePasswd the keystore password
* @param keystoreType the type of keystore (e.g., PKCS12, JKS)
* @param protocol the protocol to use for the SSL connection
*
* @return a SSLContext instance
* @throws java.security.KeyStoreException if any issues accessing the keystore
@ -120,7 +122,7 @@ public final class SslContextFactory {
* @throws java.security.KeyManagementException if unable to manage the key
*/
public static SSLContext createSslContext(
final String keystore, final char[] keystorePasswd, final String keystoreType)
final String keystore, final char[] keystorePasswd, final String keystoreType, final String protocol)
throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
UnrecoverableKeyException, KeyManagementException {
@ -133,7 +135,7 @@ public final class SslContextFactory {
keyManagerFactory.init(keyStore, keystorePasswd);
// initialize the ssl context
final SSLContext ctx = SSLContext.getInstance("TLS");
final SSLContext ctx = SSLContext.getInstance(protocol);
ctx.init(keyManagerFactory.getKeyManagers(), new TrustManager[0], new SecureRandom());
return ctx;
@ -146,6 +148,7 @@ public final class SslContextFactory {
* @param truststore the full path to the truststore
* @param truststorePasswd the truststore password
* @param truststoreType the type of truststore (e.g., PKCS12, JKS)
* @param protocol the protocol to use for the SSL connection
*
* @return a SSLContext instance
* @throws java.security.KeyStoreException if any issues accessing the keystore
@ -156,7 +159,7 @@ public final class SslContextFactory {
* @throws java.security.KeyManagementException if unable to manage the key
*/
public static SSLContext createTrustSslContext(
final String truststore, final char[] truststorePasswd, final String truststoreType)
final String truststore, final char[] truststorePasswd, final String truststoreType, final String protocol)
throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
UnrecoverableKeyException, KeyManagementException {
@ -169,7 +172,7 @@ public final class SslContextFactory {
trustManagerFactory.init(trustStore);
// initialize the ssl context
final SSLContext ctx = SSLContext.getInstance("TLS");
final SSLContext ctx = SSLContext.getInstance(protocol);
ctx.init(new KeyManager[0], trustManagerFactory.getTrustManagers(), new SecureRandom());
return ctx;

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-site-to-site-client</artifactId>
@ -42,7 +42,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-client-dto</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-socket-utils</artifactId>
<description>Utilities for socket communication</description>

View File

@ -45,17 +45,19 @@ public final class ChannelDispatcher implements Runnable {
private final StreamConsumerFactory factory;
private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
private final long timeout;
private final boolean readSingleDatagram;
private volatile boolean stop = false;
public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service,
final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit) {
final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit, final boolean readSingleDatagram) {
this.serverSocketSelector = serverSocketSelector;
this.socketChannelSelector = socketChannelSelector;
this.executor = service;
this.factory = factory;
emptyBuffers = buffers;
this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
this.readSingleDatagram = readSingleDatagram;
}
public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) {
@ -136,7 +138,7 @@ public final class ChannelDispatcher implements Runnable {
// for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
// way to tell if it's new is the lack of an attachment.
if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory);
reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory, readSingleDatagram);
socketChannelKey.attach(reader);
final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
TimeUnit.MILLISECONDS);

View File

@ -75,14 +75,14 @@ public final class ChannelListener {
private volatile long channelReaderFrequencyMSecs = 50;
public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
TimeUnit unit) throws IOException {
TimeUnit unit, final boolean readSingleDatagram) throws IOException {
this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread
this.serverSocketSelector = Selector.open();
this.socketChannelSelector = Selector.open();
this.bufferPool = bufferPool;
this.initialBufferPoolSize = bufferPool.size();
channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool,
timeout, unit);
timeout, unit, readSingleDatagram);
executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
}

View File

@ -27,8 +27,12 @@ public final class DatagramChannelReader extends AbstractChannelReader {
public static final int MAX_UDP_PACKET_SIZE = 65507;
public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
private final boolean readSingleDatagram;
public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory,
final boolean readSingleDatagram) {
super(id, key, empties, consumerFactory);
this.readSingleDatagram = readSingleDatagram;
}
/**
@ -45,7 +49,7 @@ public final class DatagramChannelReader extends AbstractChannelReader {
final DatagramChannel dChannel = (DatagramChannel) key.channel();
final int initialBufferPosition = buffer.position();
while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
if (dChannel.receive(buffer) == null) {
if (dChannel.receive(buffer) == null || readSingleDatagram) {
break;
}
}

View File

@ -52,7 +52,7 @@ public final class ServerMain {
ChannelListener listener = null;
try {
executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS);
listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS);
listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS, false);
listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS);
listener.addDatagramChannel(null, 20000, 32 << 20);
LOGGER.info("Listening for UDP data on port 20000");

View File

@ -18,10 +18,10 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-utils</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<!--
This project intentionally has no additional dependencies beyond that pulled in by the parent. It is a general purpose utility library

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-web-utils</artifactId>
<dependencies>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-write-ahead-log</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<packaging>pom</packaging>
<artifactId>nifi-docs</artifactId>
@ -107,7 +107,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<excludes combine.children="append">
<exclude>src/main/asciidoc/asciidoc-mod.css</exclude> <!-- MIT license confirmed. Excluding due to parse error-->
</excludes>
</configuration>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-external</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-receiver</artifactId>
@ -27,7 +27,6 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-external</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-storm-spout</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import java.util.Map;
/**
* <p>
* The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
* a FlowFile's content and its attributes so that they can be processed by
* Storm
* </p>
*/
public interface NiFiDataPacket {
/**
* @return the contents of a NiFi FlowFile
*/
byte[] getContent();
/**
* @return a Map of attributes that are associated with the NiFi FlowFile
*/
Map<String, String> getAttributes();
}

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
/**
* <p>
* The <code>NiFiSpout</code> provides a way to pull data from Apache NiFi so
* that it can be processed by Apache Storm. The NiFi Spout connects to a NiFi
* instance provided in the config and requests data from the OutputPort that
* is named. In NiFi, when an OutputPort is added to the root process group,
* it acts as a queue of data for remote clients. This spout is then able to
* pull that data from NiFi reliably.
* </p>
*
* <p>
* It is important to note that if pulling data from a NiFi cluster, the URL
* that should be used is that of the NiFi Cluster Manager. The Receiver will
* automatically handle determining the nodes in that cluster and pull from
* those nodes as appropriate.
* </p>
*
* <p>
* In order to use the NiFiSpout, you will need to first build a
* {@link SiteToSiteClientConfig} to provide to the constructor. This can be
* achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
* snippet of driver code to pull data from NiFi that is running on
* localhost:8080. This example assumes that NiFi exposes an OutputPort on the
* root group named "Data For Storm". Additionally, it assumes that the data
* that it will receive from this OutputPort is text data, as it will map the
* byte array received from NiFi to a UTF-8 Encoded string.
* </p>
*
* <code>
* <pre>
* {@code
*
* // Build a Site-To-Site client config
* SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
* .url("http://localhost:8080/nifi")
* .portName("Data for Storm")
* .buildConfig();
*
* // Build a topology starting with a NiFiSpout
* TopologyBuilder builder = new TopologyBuilder();
* builder.setSpout("nifi", new NiFiSpout(clientConfig));
*
* // Add a bolt that prints the attributes and content
* builder.setBolt("print", new BaseBasicBolt() {
* @Override
* public void execute(Tuple tuple, BasicOutputCollector collector) {
* NiFiDataPacket dp = (NiFiDataPacket) tuple.getValueByField("nifiDataPacket");
* System.out.println("Attributes: " + dp.getAttributes());
* System.out.println("Content: " + new String(dp.getContent()));
* }
*
* @Override
* public void declareOutputFields(OutputFieldsDeclarer declarer) {}
*
* }).shuffleGrouping("nifi");
*
* // Submit the topology running in local mode
* Config conf = new Config();
* LocalCluster cluster = new LocalCluster();
* cluster.submitTopology("test", conf, builder.createTopology());
*
* Utils.sleep(90000);
* cluster.shutdown();
* }
* </pre>
* </code>
*/
public class NiFiSpout extends BaseRichSpout {
private static final long serialVersionUID = 3067274587595578836L;
public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
private NiFiSpoutReceiver spoutReceiver;
private LinkedBlockingQueue<NiFiDataPacket> queue;
private SpoutOutputCollector spoutOutputCollector;
private final SiteToSiteClientConfig clientConfig;
public NiFiSpout(SiteToSiteClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
this.queue = new LinkedBlockingQueue<>(1000);
this.spoutReceiver = new NiFiSpoutReceiver();
this.spoutReceiver.setDaemon(true);
this.spoutReceiver.setName("NiFi Spout Receiver");
this.spoutReceiver.start();
}
@Override
public void nextTuple() {
NiFiDataPacket data = queue.poll();
if (data == null) {
Utils.sleep(50);
} else {
spoutOutputCollector.emit(new Values(data));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("nifiDataPacket"));
}
@Override
public void close() {
super.close();
spoutReceiver.shutdown();
}
class NiFiSpoutReceiver extends Thread {
private boolean shutdown = false;
public synchronized void shutdown() {
this.shutdown = true;
}
@Override
public void run() {
try {
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
try {
while (!shutdown) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
DataPacket dataPacket = transaction.receive();
if (dataPacket == null) {
transaction.confirm();
transaction.complete();
// no data available. Wait a bit and try again
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
continue;
}
final List<NiFiDataPacket> dataPackets = new ArrayList<>();
do {
// Read the data into a byte array and wrap it along with the attributes
// into a NiFiDataPacket.
final InputStream inStream = dataPacket.getData();
final byte[] data = new byte[(int) dataPacket.getSize()];
StreamUtils.fillBuffer(inStream, data);
final Map<String, String> attributes = dataPacket.getAttributes();
final NiFiDataPacket niFiDataPacket = new NiFiDataPacket() {
@Override
public byte[] getContent() {
return data;
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
};
dataPackets.add(niFiDataPacket);
dataPacket = transaction.receive();
} while (dataPacket != null);
// Confirm transaction to verify the data
transaction.confirm();
for (NiFiDataPacket dp : dataPackets) {
queue.offer(dp);
}
transaction.complete();
}
} finally {
try {
client.close();
} catch (final IOException ioe) {
LOGGER.error("Failed to close client", ioe);
}
}
} catch (final IOException ioe) {
LOGGER.error("Failed to receive data from NiFi", ioe);
}
}
}
}

View File

@ -18,12 +18,13 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-external</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-spark-receiver</module>
<module>nifi-storm-spout</module>
</modules>
</project>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-maven-archetypes</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-processor-bundle-archetype</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-maven-archetypes</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-mock</artifactId>
<dependencies>

View File

@ -40,11 +40,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import org.junit.Assert;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
@ -54,6 +54,7 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.junit.Assert;
public class MockProcessSession implements ProcessSession {
@ -65,14 +66,16 @@ public class MockProcessSession implements ProcessSession {
private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
private final SharedSessionState sharedState;
private final Map<String, Long> counterMap = new HashMap<>();
private final ProvenanceReporter provenanceReporter;
private boolean committed = false;
private boolean rolledback = false;
private int removedCount = 0;
public MockProcessSession(final SharedSessionState sharedState) {
public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
this.sharedState = sharedState;
this.processorQueue = sharedState.getFlowFileQueue();
provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
}
@Override
@ -194,7 +197,7 @@ public class MockProcessSession implements ProcessSession {
try {
out.write(mock.getData());
} catch (IOException e) {
} catch (final IOException e) {
throw new FlowFileAccessException(e.toString(), e);
}
}
@ -409,7 +412,7 @@ public class MockProcessSession implements ProcessSession {
final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
try {
callback.process(bais);
} catch (IOException e) {
} catch (final IOException e) {
throw new ProcessException(e.toString(), e);
}
}
@ -766,7 +769,7 @@ public class MockProcessSession implements ProcessSession {
if (source == null || destination == null || source == destination) {
return destination; //don't need to inherit from ourselves
}
FlowFile updated = putAllAttributes(destination, source.getAttributes());
final FlowFile updated = putAllAttributes(destination, source.getAttributes());
getProvenanceReporter().fork(source, Collections.singletonList(updated));
return updated;
}
@ -803,7 +806,7 @@ public class MockProcessSession implements ProcessSession {
}
}
FlowFile updated = putAllAttributes(destination, intersectAttributes(sources));
final FlowFile updated = putAllAttributes(destination, intersectAttributes(sources));
getProvenanceReporter().join(sources, updated);
return updated;
}
@ -982,7 +985,7 @@ public class MockProcessSession implements ProcessSession {
@Override
public ProvenanceReporter getProvenanceReporter() {
return sharedState.getProvenanceReporter();
return provenanceReporter;
}
@Override
@ -997,4 +1000,27 @@ public class MockProcessSession implements ProcessSession {
validateState(flowFile);
return flowFile.getData();
}
/**
* Checks if a FlowFile is known in this session.
*
* @param flowFile
* the FlowFile to check
* @return <code>true</code> if the FlowFile is known in this session,
* <code>false</code> otherwise.
*/
boolean isFlowFileKnown(final FlowFile flowFile) {
final FlowFile curFlowFile = currentVersions.get(flowFile.getId());
if (curFlowFile == null) {
return false;
}
final String curUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key());
final String providedUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key());
if (!curUuid.equals(providedUuid)) {
return false;
}
return true;
}
}

View File

@ -17,186 +17,437 @@
package org.apache.nifi.util;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MockProvenanceReporter implements ProvenanceReporter {
private static final Logger logger = LoggerFactory.getLogger(MockProvenanceReporter.class);
private final MockProcessSession session;
private final String processorId;
private final String processorType;
private final SharedSessionState sharedSessionState;
private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>();
@Override
public void receive(FlowFile flowFile, String sourceSystemUri) {
public MockProvenanceReporter(final MockProcessSession session, final SharedSessionState sharedState, final String processorId, final String processorType) {
this.session = session;
this.sharedSessionState = sharedState;
this.processorId = processorId;
this.processorType = processorType;
}
private void verifyFlowFileKnown(final FlowFile flowFile) {
if (session != null && !session.isFlowFileKnown(flowFile)) {
throw new FlowFileHandlingException(flowFile + " is not known to " + session);
}
}
Set<ProvenanceEventRecord> getEvents() {
return Collections.unmodifiableSet(events);
}
/**
* Removes the given event from the reporter
*
* @param event
* event
*/
void remove(final ProvenanceEventRecord event) {
events.remove(event);
}
void clear() {
events.clear();
}
/**
* Generates a Fork event for the given child and parents but does not
* register the event. This is useful so that a ProcessSession has the
* ability to de-dupe events, since one or more events may be created by the
* session itself, as well as by the Processor
*
* @param parents
* parents
* @param child
* child
* @return record
*/
ProvenanceEventRecord generateJoinEvent(final Collection<FlowFile> parents, final FlowFile child) {
final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN);
eventBuilder.addChildFlowFile(child);
for (final FlowFile parent : parents) {
eventBuilder.addParentFlowFile(parent);
}
return eventBuilder.build();
}
ProvenanceEventRecord generateDropEvent(final FlowFile flowFile, final String details) {
return build(flowFile, ProvenanceEventType.DROP).setDetails(details).build();
}
@Override
public void send(FlowFile flowFile, String destinationSystemUri) {
public void receive(final FlowFile flowFile, final String transitUri) {
receive(flowFile, transitUri, -1L);
}
@Override
public void send(FlowFile flowFile, String destinationSystemUri, boolean force) {
public void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier) {
receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, -1L);
}
@Override
public void receive(FlowFile flowFile, String sourceSystemUri, long transmissionMillis) {
public void receive(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
receive(flowFile, transitUri, null, transmissionMillis);
}
@Override
public void receive(FlowFile flowFile, String sourceSystemUri, String sourceSystemFlowFileIdentifier) {
public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final long transmissionMillis) {
receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, null, transmissionMillis);
}
@Override
public void receive(FlowFile flowFile, String sourceSystemUri, String sourceSystemFlowFileIdentifier, long transmissionMillis) {
public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final String details, final long transmissionMillis) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE)
.setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void send(FlowFile flowFile, String destinationSystemUri, long transmissionMillis) {
public void send(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
send(flowFile, transitUri, transmissionMillis, true);
}
@Override
public void send(FlowFile flowFile, String destinationSystemUri, long transmissionMillis, boolean force) {
public void send(final FlowFile flowFile, final String transitUri) {
send(flowFile, transitUri, null, -1L, true);
}
@Override
public void associate(FlowFile flowFile, String alternateIdentifierNamespace, String alternateIdentifier) {
public void send(final FlowFile flowFile, final String transitUri, final String details) {
send(flowFile, transitUri, details, -1L, true);
}
@Override
public void fork(FlowFile parent, Collection<FlowFile> children) {
public void send(final FlowFile flowFile, final String transitUri, final long transmissionMillis, final boolean force) {
send(flowFile, transitUri, null, transmissionMillis, force);
}
@Override
public void fork(FlowFile parent, Collection<FlowFile> children, long forkDuration) {
public void send(final FlowFile flowFile, final String transitUri, final String details, final boolean force) {
send(flowFile, transitUri, details, -1L, force);
}
@Override
public void fork(FlowFile parent, Collection<FlowFile> children, String details) {
public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
send(flowFile, transitUri, details, transmissionMillis, true);
}
@Override
public void fork(FlowFile parent, java.util.Collection<FlowFile> children, String details, long forkDuration) {
public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build();
if (force) {
sharedSessionState.addProvenanceEvents(Collections.singleton(record));
} else {
events.add(record);
}
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void join(Collection<FlowFile> parents, FlowFile child) {
public void send(final FlowFile flowFile, final String transitUri, final boolean force) {
send(flowFile, transitUri, -1L, true);
}
@Override
public void join(Collection<FlowFile> parents, FlowFile child, long joinDuration) {
public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) {
try {
String trimmedNamespace = alternateIdentifierNamespace.trim();
if (trimmedNamespace.endsWith(":")) {
trimmedNamespace = trimmedNamespace.substring(0, trimmedNamespace.length() - 1);
}
String trimmedIdentifier = alternateIdentifier.trim();
if (trimmedIdentifier.startsWith(":")) {
if (trimmedIdentifier.length() == 1) {
throw new IllegalArgumentException("Illegal alternateIdentifier: " + alternateIdentifier);
}
trimmedIdentifier = trimmedIdentifier.substring(1);
}
final String alternateIdentifierUri = trimmedNamespace + ":" + trimmedIdentifier;
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ADDINFO).setAlternateIdentifierUri(alternateIdentifierUri).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
ProvenanceEventRecord drop(final FlowFile flowFile, final String reason) {
try {
final ProvenanceEventBuilder builder = build(flowFile, ProvenanceEventType.DROP);
if (reason != null) {
builder.setDetails("Discard reason: " + reason);
}
final ProvenanceEventRecord record = builder.build();
events.add(record);
return record;
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
return null;
}
}
void expire(final FlowFile flowFile, final String details) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.EXPIRE).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void join(Collection<FlowFile> parents, FlowFile child, String details) {
public void fork(final FlowFile parent, final Collection<FlowFile> children) {
fork(parent, children, null, -1L);
}
@Override
public void join(java.util.Collection<FlowFile> parents, FlowFile child, String details, long joinDuration) {
public void fork(final FlowFile parent, final Collection<FlowFile> children, final long forkDuration) {
fork(parent, children, null, forkDuration);
}
@Override
public void clone(FlowFile parent, FlowFile child) {
public void fork(final FlowFile parent, final Collection<FlowFile> children, final String details) {
fork(parent, children, details, -1L);
}
@Override
public void modifyContent(FlowFile flowFile) {
public void fork(final FlowFile parent, final Collection<FlowFile> children, final String details, final long forkDuration) {
verifyFlowFileKnown(parent);
try {
final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.FORK);
eventBuilder.addParentFlowFile(parent);
for (final FlowFile child : children) {
eventBuilder.addChildFlowFile(child);
}
if (forkDuration > -1L) {
eventBuilder.setEventDuration(forkDuration);
}
if (details != null) {
eventBuilder.setDetails(details);
}
events.add(eventBuilder.build());
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void modifyContent(FlowFile flowFile, String details) {
public void join(final Collection<FlowFile> parents, final FlowFile child) {
join(parents, child, null, -1L);
}
@Override
public void modifyContent(FlowFile flowFile, long processingMillis) {
public void join(final Collection<FlowFile> parents, final FlowFile child, final long joinDuration) {
join(parents, child, null, joinDuration);
}
@Override
public void modifyContent(FlowFile flowFile, String details, long processingMillis) {
public void join(final Collection<FlowFile> parents, final FlowFile child, final String details) {
join(parents, child, details, -1L);
}
@Override
public void modifyAttributes(FlowFile flowFile) {
public void join(final Collection<FlowFile> parents, final FlowFile child, final String details, final long joinDuration) {
verifyFlowFileKnown(child);
try {
final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN);
eventBuilder.addChildFlowFile(child);
eventBuilder.setDetails(details);
for (final FlowFile parent : parents) {
eventBuilder.addParentFlowFile(parent);
}
events.add(eventBuilder.build());
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void modifyAttributes(FlowFile flowFile, String details) {
public void clone(final FlowFile parent, final FlowFile child) {
verifyFlowFileKnown(child);
try {
final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE);
eventBuilder.addChildFlowFile(child);
eventBuilder.addParentFlowFile(parent);
events.add(eventBuilder.build());
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void route(FlowFile flowFile, Relationship relationship) {
public void modifyContent(final FlowFile flowFile) {
modifyContent(flowFile, null, -1L);
}
@Override
public void route(FlowFile flowFile, Relationship relationship, String details) {
public void modifyContent(final FlowFile flowFile, final String details) {
modifyContent(flowFile, details, -1L);
}
@Override
public void route(FlowFile flowFile, Relationship relationship, long processingDuration) {
public void modifyContent(final FlowFile flowFile, final long processingMillis) {
modifyContent(flowFile, null, processingMillis);
}
@Override
public void route(FlowFile flowFile, Relationship relationship, String details, long processingDuration) {
public void modifyContent(final FlowFile flowFile, final String details, final long processingMillis) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CONTENT_MODIFIED).setEventDuration(processingMillis).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void create(FlowFile flowFile) {
public void modifyAttributes(final FlowFile flowFile) {
modifyAttributes(flowFile, null);
}
@Override
public void create(FlowFile flowFile, String details) {
public void modifyAttributes(final FlowFile flowFile, final String details) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void receive(FlowFile flowFile, String sourceSystemUri, String sourceSystemFlowFileIdentifier, String details, long transmissionMillis) {
public void route(final FlowFile flowFile, final Relationship relationship) {
route(flowFile, relationship, null);
}
@Override
public void send(FlowFile flowFile, String destinationSystemUri, String details) {
public void route(final FlowFile flowFile, final Relationship relationship, final long processingDuration) {
route(flowFile, relationship, null, processingDuration);
}
@Override
public void send(FlowFile flowFile, String destinationSystemUri, String details, long transmissionMillis) {
public void route(final FlowFile flowFile, final Relationship relationship, final String details) {
route(flowFile, relationship, details, -1L);
}
@Override
public void send(FlowFile flowFile, String destinationSystemUri, String details, boolean force) {
public void route(final FlowFile flowFile, final Relationship relationship, final String details, final long processingDuration) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ROUTE).setRelationship(relationship).setDetails(details).setEventDuration(processingDuration).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void send(FlowFile flowFile, String destinationSystemUri, String details, long transmissionMillis, boolean force) {
public void create(final FlowFile flowFile) {
create(flowFile, null);
}
@Override
public void create(final FlowFile flowFile, final String details) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) {
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventType(eventType);
builder.fromFlowFile(flowFile);
builder.setLineageStartDate(flowFile.getLineageStartDate());
builder.setComponentId(processorId);
builder.setComponentType(processorType);
return builder;
}
}

View File

@ -22,20 +22,22 @@ import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
public class MockSessionFactory implements ProcessSessionFactory {
private final Processor processor;
private final SharedSessionState sharedState;
private final Set<MockProcessSession> createdSessions = new CopyOnWriteArraySet<>();
MockSessionFactory(final SharedSessionState sharedState) {
MockSessionFactory(final SharedSessionState sharedState, final Processor processor) {
this.sharedState = sharedState;
this.processor = processor;
}
@Override
public ProcessSession createSession() {
final MockProcessSession session = new MockProcessSession(sharedState);
final MockProcessSession session = new MockProcessSession(sharedState, processor);
createdSessions.add(session);
return session;
}

View File

@ -16,11 +16,18 @@
*/
package org.apache.nifi.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceReporter;
public class SharedSessionState {
@ -31,14 +38,27 @@ public class SharedSessionState {
private final Processor processor;
private final AtomicLong flowFileIdGenerator;
private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>();
public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) {
flowFileQueue = new MockFlowFileQueue();
provenanceReporter = new MockProvenanceReporter();
provenanceReporter = new MockProvenanceReporter(null, this, UUID.randomUUID().toString(), "N/A");
this.flowFileIdGenerator = flowFileIdGenerator;
this.processor = processor;
}
void addProvenanceEvents(final Collection<ProvenanceEventRecord> events) {
this.events.addAll(events);
}
void clearProvenanceEvents() {
this.events.clear();
}
public List<ProvenanceEventRecord> getProvenanceEvents() {
return new ArrayList<>(this.events);
}
public MockFlowFileQueue getFlowFileQueue() {
return flowFileQueue;
}
@ -55,7 +75,7 @@ public class SharedSessionState {
AtomicLong counter = counterMap.get(name);
if (counter == null) {
counter = new AtomicLong(0L);
AtomicLong existingCounter = counterMap.putIfAbsent(name, counter);
final AtomicLong existingCounter = counterMap.putIfAbsent(name, counter);
if (existingCounter != null) {
counter = existingCounter;
}
@ -66,6 +86,6 @@ public class SharedSessionState {
public Long getCounterValue(final String name) {
final AtomicLong counterValue = counterMap.get(name);
return (counterValue == null) ? null : counterValue.get();
return counterValue == null ? null : counterValue.get();
}
}

View File

@ -64,11 +64,10 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.reporting.InitializationException;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardProcessorTestRunner implements TestRunner {
@ -83,7 +82,6 @@ public class StandardProcessorTestRunner implements TestRunner {
private int numThreads = 1;
private final AtomicInteger invocations = new AtomicInteger(0);
private static final Logger logger = LoggerFactory.getLogger(StandardProcessorTestRunner.class);
private static final Set<Class<? extends Annotation>> deprecatedTypeAnnotations = new HashSet<>();
private static final Set<Class<? extends Annotation>> deprecatedMethodAnnotations = new HashSet<>();
@ -99,7 +97,7 @@ public class StandardProcessorTestRunner implements TestRunner {
this.idGenerator = new AtomicLong(0L);
this.sharedState = new SharedSessionState(processor, idGenerator);
this.flowFileQueue = sharedState.getFlowFileQueue();
this.sessionFactory = new MockSessionFactory(sharedState);
this.sessionFactory = new MockSessionFactory(sharedState, processor);
this.context = new MockProcessContext(processor);
detectDeprecatedAnnotations(processor);
@ -109,7 +107,7 @@ public class StandardProcessorTestRunner implements TestRunner {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
} catch (Exception e) {
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnAdded annotation due to: " + e);
}
@ -194,7 +192,7 @@ public class StandardProcessorTestRunner implements TestRunner {
if (initialize) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context);
} catch (Exception e) {
} catch (final Exception e) {
e.printStackTrace();
Assert.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e);
}
@ -223,7 +221,7 @@ public class StandardProcessorTestRunner implements TestRunner {
unscheduledRun = true;
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
} catch (Exception e) {
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e);
}
}
@ -234,7 +232,7 @@ public class StandardProcessorTestRunner implements TestRunner {
if (!unscheduledRun) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
} catch (Exception e) {
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e);
}
}
@ -242,7 +240,7 @@ public class StandardProcessorTestRunner implements TestRunner {
if (stopOnFinish) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor);
} catch (Exception e) {
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnStopped annotation due to: " + e);
}
}
@ -255,7 +253,7 @@ public class StandardProcessorTestRunner implements TestRunner {
public void shutdown() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnShutdown.class, processor);
} catch (Exception e) {
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnShutdown annotation due to: " + e);
}
}
@ -388,7 +386,7 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public void enqueue(final InputStream data, final Map<String, String> attributes) {
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator));
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor);
MockFlowFile flowFile = session.create();
flowFile = session.importFrom(data, flowFile);
flowFile = session.putAllAttributes(flowFile, attributes);
@ -423,7 +421,11 @@ public class StandardProcessorTestRunner implements TestRunner {
return flowFiles;
}
/**
* @deprecated The ProvenanceReporter should not be accessed through the test runner, as it does not expose the events that were emitted.
*/
@Override
@Deprecated
public ProvenanceReporter getProvenanceReporter() {
return sharedState.getProvenanceReporter();
}
@ -703,4 +705,14 @@ public class StandardProcessorTestRunner implements TestRunner {
return context.removeProperty(descriptor);
}
@Override
public List<ProvenanceEventRecord> getProvenanceEvents() {
return sharedState.getProvenanceEvents();
}
@Override
public void clearProvenanceEvents() {
sharedState.clearProvenanceEvents();
}
}

View File

@ -32,6 +32,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.reporting.InitializationException;
@ -702,4 +703,18 @@ public interface TestRunner {
* @return true if removed
*/
boolean removeProperty(PropertyDescriptor descriptor);
/**
* Returns a {@link List} of all {@link ProvenanceEventRecord}s that were
* emitted by the Processor
*
* @return a List of all Provenance Events that were emitted by the
* Processor
*/
List<ProvenanceEventRecord> getProvenanceEvents();
/**
* Clears the Provenance Events that have been emitted by the Processor
*/
void clearProvenanceEvents();
}

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-nar</artifactId>
@ -29,7 +29,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-processors</artifactId>
@ -61,7 +61,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<excludes combine.children="append">
<exclude>src/test/resources/hello.txt</exclude>
</excludes>
</configuration>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-bundle</artifactId>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-nar</artifactId>
<packaging>nar</packaging>

View File

@ -80,15 +80,15 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Spring Framework
The following NOTICE information applies:
Spring Framework 4.1.4.RELEASE
Copyright (c) 2002-2014 Pivotal, Inc.
Spring Framework 4.1.6.RELEASE
Copyright (c) 2002-2015 Pivotal, Inc.
(ASLv2) Swagger Core
The following NOTICE information applies:
@ -101,13 +101,13 @@ Common Development and Distribution License 1.1
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.18.3 - https://jersey.java.net/jersey-client/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.18.3 - https://jersey.java.net/jersey-core/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-spring (com.sun.jersey:jersey-spring:jar:1.18.3 - https://jersey.java.net/jersey-spring/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-servlet (com.sun.jersey:jersey-servlet:jar:1.18.3 - https://jersey.java.net/jersey-servlet/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-multipart (com.sun.jersey:jersey-multipart:jar:1.18.3 - https://jersey.java.net/jersey-multipart/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.18.3 - https://jersey.java.net/jersey-server/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.18.3 - https://jersey.java.net/jersey-json/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net/jersey-client/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/jersey-core/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-spring (com.sun.jersey:jersey-spring:jar:1.19 - https://jersey.java.net/jersey-spring/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-servlet (com.sun.jersey:jersey-servlet:jar:1.19 - https://jersey.java.net/jersey-servlet/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-multipart (com.sun.jersey:jersey-multipart:jar:1.19 - https://jersey.java.net/jersey-multipart/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.19 - https://jersey.java.net/jersey-server/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/jersey-json/)
(CDDL 1.1) (GPL2 w/ CPE) Old JAXB Runtime (com.sun.xml.bind:jaxb-impl:jar:2.2.3-1 - http://jaxb.java.net/)
(CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.2 - https://jaxb.dev.java.net/)
(CDDL 1.1) (GPL2 w/ CPE) MIME Streaming Extension (org.jvnet.mimepull:mimepull:jar:1.9.3 - http://mimepull.java.net)
@ -121,6 +121,7 @@ The following binary components are provided under the Common Development and Di
(CDDL 1.0) (GPL3) Streaming API For XML (javax.xml.stream:stax-api:jar:1.0-2 - no url provided)
(CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp)
(CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net)
************************
Eclipse Public License 1.0
@ -128,9 +129,9 @@ Eclipse Public License 1.0
The following binary components are provided under the Eclipse Public License 1.0. See project link for details.
(EPL 1.0) AspectJ Weaver (org.aspectj:aspectjweaver:jar:1.8.4 - http://www.aspectj.org)
(EPL 1.0) AspectJ Weaver (org.aspectj:aspectjweaver:jar:1.8.5 - http://www.aspectj.org)
(EPL 1.0)(MPL 2.0) H2 Database (com.h2database:h2:jar:1.3.176 - http://www.h2database.com/html/license.html)
(EPL 1.0)(LGPL 2.1) Logback Classic (ch.qos.logback:logback-classic:jar:1.1.2 - http://logback.qos.ch/)
(EPL 1.0)(LGPL 2.1) Logback Core (ch.qos.logback:logback-core:jar:1.1.2 - http://logback.qos.ch/)
(EPL 1.0)(LGPL 2.1) Logback Classic (ch.qos.logback:logback-classic:jar:1.1.3 - http://logback.qos.ch/)
(EPL 1.0)(LGPL 2.1) Logback Core (ch.qos.logback:logback-core:jar:1.1.3 - http://logback.qos.ch/)

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-administration</artifactId>
<build>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-client-dto</artifactId>
<dependencies>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-cluster-authorization-provider</artifactId>
<dependencies>

View File

@ -1,7 +1,5 @@
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF licenses
@ -16,7 +14,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-documentation</artifactId>
<dependencies>

View File

@ -30,7 +30,7 @@ public class MockControllerServiceInitializationContext implements ControllerSer
@Override
public String getIdentifier() {
return "";
return "mock-controller-service";
}
@Override
@ -40,7 +40,7 @@ public class MockControllerServiceInitializationContext implements ControllerSer
@Override
public ComponentLog getLogger() {
return null;
return new MockProcessorLogger();
}
}

View File

@ -30,12 +30,12 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
@Override
public String getIdentifier() {
return "";
return "mock-processor";
}
@Override
public ProcessorLog getLogger() {
return null;
return new MockProcessorLogger();
}
@Override

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.documentation.mock;
import org.apache.nifi.logging.ProcessorLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Stubs out the functionality of a ProcessorLog/ComponentLog so that it can
* be used during initialization of a component.
*
*/
public class MockProcessorLogger implements ProcessorLog {
private static final Logger logger = LoggerFactory
.getLogger(MockProcessorLogger.class);
@Override
public void warn(String msg, Throwable t) {
logger.warn(msg, t);
}
@Override
public void warn(String msg, Object[] os) {
logger.warn(msg, os);
}
@Override
public void warn(String msg, Object[] os, Throwable t) {
logger.warn(msg, os);
logger.warn("", t);
}
@Override
public void warn(String msg) {
logger.warn(msg);
}
@Override
public void trace(String msg, Throwable t) {
logger.trace(msg, t);
}
@Override
public void trace(String msg, Object[] os) {
logger.trace(msg, os);
}
@Override
public void trace(String msg) {
logger.trace(msg);
}
@Override
public void trace(String msg, Object[] os, Throwable t) {
logger.trace(msg, os);
logger.trace("", t);
}
@Override
public boolean isWarnEnabled() {
return logger.isWarnEnabled();
}
@Override
public boolean isTraceEnabled() {
return logger.isTraceEnabled();
}
@Override
public boolean isInfoEnabled() {
return logger.isInfoEnabled();
}
@Override
public boolean isErrorEnabled() {
return logger.isErrorEnabled();
}
@Override
public boolean isDebugEnabled() {
return logger.isDebugEnabled();
}
@Override
public void info(String msg, Throwable t) {
logger.info(msg, t);
}
@Override
public void info(String msg, Object[] os) {
logger.info(msg, os);
}
@Override
public void info(String msg) {
logger.info(msg);
}
@Override
public void info(String msg, Object[] os, Throwable t) {
logger.trace(msg, os);
logger.trace("", t);
}
@Override
public String getName() {
return logger.getName();
}
@Override
public void error(String msg, Throwable t) {
logger.error(msg, t);
}
@Override
public void error(String msg, Object[] os) {
logger.error(msg, os);
}
@Override
public void error(String msg) {
logger.error(msg);
}
@Override
public void error(String msg, Object[] os, Throwable t) {
logger.error(msg, os);
logger.error("", t);
}
@Override
public void debug(String msg, Throwable t) {
logger.debug(msg, t);
}
@Override
public void debug(String msg, Object[] os) {
logger.debug(msg, os);
}
@Override
public void debug(String msg, Object[] os, Throwable t) {
logger.debug(msg, os);
logger.debug("", t);
}
@Override
public void debug(String msg) {
logger.debug(msg);
}
}

View File

@ -32,7 +32,7 @@ public class MockReportingInitializationContext implements ReportingInitializati
@Override
public String getIdentifier() {
return "";
return "mock-reporting-task";
}
@Override
@ -62,6 +62,6 @@ public class MockReportingInitializationContext implements ReportingInitializati
@Override
public ComponentLog getLogger() {
return null;
return new MockProcessorLogger();
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.documentation.example;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.reporting.InitializationException;
public class ControllerServiceWithLogger extends AbstractControllerService {
@Override
public void init(ControllerServiceInitializationContext context)
throws InitializationException {
context.getLogger().info("initializing...");
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.documentation.example;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
public class ProcessorWithLogger extends AbstractProcessor {
@Override
protected void init(ProcessorInitializationContext context) {
context.getLogger().info("Initializing...");
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.documentation.example;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
public class ReportingTaskWithLogger extends AbstractReportingTask {
@Override
public void init(ReportingInitializationContext config)
throws InitializationException {
config.getLogger().info("Initializing...");
}
@Override
public void onTrigger(ReportingContext context) {
}
}

View File

@ -16,22 +16,24 @@
*/
package org.apache.nifi.documentation.html;
import static org.apache.nifi.documentation.html.XmlValidator.assertContains;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.documentation.DocumentationWriter;
import org.apache.nifi.documentation.example.ControllerServiceWithLogger;
import org.apache.nifi.documentation.example.FullyDocumentedControllerService;
import org.apache.nifi.documentation.example.FullyDocumentedReportingTask;
import org.apache.nifi.documentation.example.ReportingTaskWithLogger;
import org.apache.nifi.documentation.mock.MockControllerServiceInitializationContext;
import org.apache.nifi.documentation.mock.MockReportingInitializationContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingTask;
import org.junit.Test;
import static org.apache.nifi.documentation.html.XmlValidator.assertContains;
import static org.junit.Assert.assertEquals;
public class HtmlDocumentationWriterTest {
@Test
@ -98,4 +100,36 @@ public class HtmlDocumentationWriterTest {
assertContains(results, "true");
assertContains(results, "false");
}
@Test
public void testControllerServiceWithLogger() throws InitializationException, IOException {
ControllerService controllerService = new ControllerServiceWithLogger();
controllerService.initialize(new MockControllerServiceInitializationContext());
DocumentationWriter writer = new HtmlDocumentationWriter();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.write(controllerService, baos, false);
String results = new String(baos.toByteArray());
XmlValidator.assertXmlValid(results);
}
@Test
public void testReportingTaskWithLogger() throws InitializationException, IOException {
ReportingTask controllerService = new ReportingTaskWithLogger();
controllerService.initialize(new MockReportingInitializationContext());
DocumentationWriter writer = new HtmlDocumentationWriter();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.write(controllerService, baos, false);
String results = new String(baos.toByteArray());
XmlValidator.assertXmlValid(results);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.documentation.DocumentationWriter;
import org.apache.nifi.documentation.example.FullyDocumentedProcessor;
import org.apache.nifi.documentation.example.NakedProcessor;
import org.apache.nifi.documentation.example.ProcessorWithLogger;
import org.apache.nifi.documentation.mock.MockProcessorInitializationContext;
import org.junit.Test;
@ -99,4 +100,19 @@ public class ProcessorDocumentationWriterTest {
}
@Test
public void testProcessorWithLoggerInitialization() throws IOException {
ProcessorWithLogger processor = new ProcessorWithLogger();
processor.initialize(new MockProcessorInitializationContext());
DocumentationWriter writer = new HtmlProcessorDocumentationWriter();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.write(processor, baos, false);
String results = new String(baos.toByteArray());
XmlValidator.assertXmlValid(results);
}
}

View File

@ -35,7 +35,9 @@ public class XmlValidator {
public static void assertXmlValid(String xml) {
try {
DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(new InputSource(new StringReader(xml)));
final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
dbf.setNamespaceAware(true);
dbf.newDocumentBuilder().parse(new InputSource(new StringReader(xml)));
} catch (SAXException | IOException | ParserConfigurationException e) {
Assert.fail(e.getMessage());
}

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-file-authorization-provider</artifactId>
<build>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-cluster-protocol</artifactId>
<packaging>jar</packaging>

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.cluster.protocol;
import org.apache.nifi.cluster.protocol.DataFlow;
import java.io.Serializable;
import java.util.Arrays;
@ -41,12 +40,15 @@ public class StandardDataFlow implements Serializable, DataFlow {
* Constructs an instance.
*
* @param flow a valid flow as bytes, which cannot be null
* @param templateBytes an XML representation of templates
* @param snippetBytes an XML representation of snippets
* @param templateBytes an XML representation of templates. May be null.
* @param snippetBytes an XML representation of snippets. May be null.
*
* @throws NullPointerException if any argument is null
* @throws NullPointerException if flow is null
*/
public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) {
if(flow == null){
throw new NullPointerException("Flow cannot be null");
}
this.flow = flow;
this.templateBytes = templateBytes;
this.snippetBytes = snippetBytes;
@ -63,31 +65,22 @@ public class StandardDataFlow implements Serializable, DataFlow {
return bytes == null ? null : Arrays.copyOf(bytes, bytes.length);
}
/**
* @return the raw byte array of the flow
*/
@Override
public byte[] getFlow() {
return flow;
}
/**
* @return the raw byte array of the templates
*/
@Override
public byte[] getTemplates() {
return templateBytes;
}
/**
* @return the raw byte array of the snippets
*/
@Override
public byte[] getSnippets() {
return snippetBytes;
}
/**
* @return true if processors should be automatically started at application
* startup; false otherwise
*/
@Override
public boolean isAutoStartProcessors() {
return autoStartProcessors;
}

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-cluster-web</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-cluster</artifactId>
<packaging>jar</packaging>
@ -41,6 +41,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-client-dto</artifactId>
@ -137,7 +141,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<excludes combine.children="append">
<exclude>src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt</exclude>
<exclude>src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt</exclude>
</excludes>

View File

@ -406,7 +406,9 @@ public class DataFlowDaoImpl implements DataFlowDao {
private byte[] getEmptyFlowBytes() throws IOException {
try {
final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
docBuilderFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
final Element controller = document.createElement("flowController");

View File

@ -1085,8 +1085,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder builder = docFactory.newDocumentBuilder();
docFactory.setNamespaceAware(true);
final DocumentBuilder builder = docFactory.newDocumentBuilder();
builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@Override
public void fatalError(final SAXParseException err) throws SAXException {
@ -1483,6 +1484,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
final Element rootElement = document.createElement("controllerServices");
@ -1497,6 +1500,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
final Element rootElement = document.createElement("reportingTasks");

View File

@ -131,7 +131,10 @@ public class DataFlowManagementServiceImplTest {
private void verifyFlow() throws ParserConfigurationException, SAXException, IOException {
final byte[] flowBytes = service.loadDataFlow().getDataFlow().getFlow();
final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document doc = docBuilder.parse(new ByteArrayInputStream(flowBytes));
final Element controller = (Element) doc.getElementsByTagName("flowController").item(0);
final Element rootGroup = (Element) controller.getElementsByTagName("rootGroup").item(0);

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-core-api</artifactId>
<dependencies>
@ -30,10 +30,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-runtime</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-client-dto</artifactId>

View File

@ -363,7 +363,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
return true;
}
if (maxBytes > 0 && (queueSize.getByteCount() >= maxBytes)) {
if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) {
return true;
}
@ -437,7 +437,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size()));
final Iterator<FlowFileRecord> itr = swapQueue.iterator();
while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) {
FlowFileRecord record = itr.next();
final FlowFileRecord record = itr.next();
swapRecords.add(record);
itr.remove();
}
@ -606,7 +606,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
boolean isExpired;
migrateSwapToActive();
boolean queueFullAtStart = queueFullRef.get();
final boolean queueFullAtStart = queueFullRef.get();
do {
flowFile = this.activeQueue.poll();
@ -794,9 +794,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.lock();
try {
migrateSwapToActive();
if (activeQueue.isEmpty()) {
return Collections.emptyList();
}
final long expirationMillis = this.flowFileExpirationMillis.get();
final boolean queueFullAtStart = queueFullRef.get();
@ -804,6 +801,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
final List<FlowFileRecord> unselected = new ArrayList<>();
// the prefetch doesn't allow us to add records back. So when this method is used,
// if there are prefetched records, we have to requeue them into the active queue first.
final PreFetch prefetch = preFetchRef.get();
if (prefetch != null) {
requeueExpiredPrefetch(prefetch);
}
while (true) {
FlowFileRecord flowFile = this.activeQueue.poll();
if (flowFile == null) {
@ -961,11 +965,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.unlock("external unlock");
}
@Override
public QueueSize getUnacknowledgedQueueSize() {
return unacknowledgedSizeRef.get();
}
private void updateUnacknowledgedSize(final int addToCount, final long addToSize) {
boolean updated = false;
do {
QueueSize queueSize = unacknowledgedSizeRef.get();
final QueueSize queueSize = unacknowledgedSizeRef.get();
final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize);
updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
} while (!updated);

View File

@ -641,6 +641,17 @@ public interface ProcessGroup {
void verifyCanDelete();
/**
* Ensures that the ProcessGroup is eligible to be deleted.
*
* @param ignorePortConnections if true, the Connections that are currently connected to Ports
* will be ignored. Otherwise, the ProcessGroup is not eligible for deletion if its input ports
* or output ports have any connections
*
* @throws IllegalStateException if the ProcessGroup is not eligible for deletion
*/
void verifyCanDelete(boolean ignorePortConnections);
void verifyCanStart();
void verifyCanStop();

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-core</artifactId>
<packaging>jar</packaging>
@ -39,6 +39,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
@ -127,7 +131,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<excludes combine.children="append">
<exclude>src/test/resources/conf/0bytes.xml</exclude>
<exclude>src/test/resources/conf/termination-only.xml</exclude>
<exclude>src/test/resources/hello.txt</exclude>

View File

@ -181,6 +181,10 @@ public final class StandardConnection implements Connection {
throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
}
if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 0) {
throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination);
}
try {
previousDestination.removeConnection(this);
this.destination.set(newDestination);

View File

@ -54,8 +54,9 @@ public class FlowUnmarshaller {
}
final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
final DocumentBuilder docBuilder = dbf.newDocumentBuilder();
dbf.setNamespaceAware(true);
final DocumentBuilder docBuilder = dbf.newDocumentBuilder();
final Document document = docBuilder.parse(new ByteArrayInputStream(flowContents));
final FlowSnippetDTO flowDto = new FlowSnippetDTO();

View File

@ -71,6 +71,8 @@ public class StandardFlowSerializer implements FlowSerializer {
try {
// create a new, empty document
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document doc = docBuilder.newDocument();

View File

@ -345,7 +345,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
final Schema schema = schemaFactory.newSchema(FLOW_XSD_RESOURCE);
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
docFactory.setSchema(schema);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
// parse flow

View File

@ -178,7 +178,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw new AssertionError("Connectable type is " + connectable.getConnectableType());
}
this.provenanceReporter = new StandardProvenanceReporter(connectable.getIdentifier(), componentType, context.getProvenanceRepository(), this);
this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType,
context.getProvenanceRepository(), this);
this.sessionId = idGenerator.getAndIncrement();
this.connectableDescription = description;
@ -324,7 +325,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable;
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
//records which have been updated - remove original if exists
@ -519,9 +520,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
updateEventContentClaims(builder, flowFile, checkpoint.records.get(flowFile));
final ProvenanceEventRecord event = builder.build();
if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) {
if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
// If framework generated the event, add it to the 'recordsToSubmit' Set.
if (!processorGenerated.contains(event)) {
recordsToSubmit.add(event);
}
// Register the FORK event for each child and each parent.
for (final String childUuid : event.getChildUuids()) {
addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
}
@ -536,13 +541,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
continue;
}
if (isSpuriousRouteEvent(event, checkpoint.records)) {
continue;
}
// Check if the event indicates that the FlowFile was routed to the same
// connection from which it was pulled (and only this connection). If so, discard the event.
isSpuriousRouteEvent(event, checkpoint.records);
if (isSpuriousRouteEvent(event, checkpoint.records)) {
continue;
}
recordsToSubmit.add(event);
addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
@ -648,7 +652,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return new Iterator<ProvenanceEventRecord>() {
@Override
public boolean hasNext() {
return recordsToSubmitIterator.hasNext() || (autoTermIterator != null && autoTermIterator.hasNext());
return recordsToSubmitIterator.hasNext() || autoTermIterator != null && autoTermIterator.hasNext();
}
@Override
@ -1053,8 +1057,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private void formatNanos(final long nanos, final StringBuilder sb) {
final long seconds = (nanos > 1000000000L) ? (nanos / 1000000000L) : 0L;
long millis = (nanos > 1000000L) ? (nanos / 1000000L) : 0L;;
final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
long millis = nanos > 1000000L ? nanos / 1000000L : 0L;;
final long nanosLeft = nanos % 1000000L;
if (seconds > 0) {
@ -1606,7 +1610,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
processorType = connectable.getClass().getSimpleName();
}
final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(connectable.getIdentifier(),
final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(),
processorType, context.getProvenanceRepository(), this);
final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
@ -1620,7 +1624,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
removeContent(flowFile.getContentClaim());
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable;
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
}
@ -1825,7 +1829,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
readCount += copied;
// don't add demarcator after the last claim
if (useDemarcator && (++objectIndex < numSources)) {
if (useDemarcator && ++objectIndex < numSources) {
out.write(demarcator);
writtenCount += demarcator.length;
}
@ -2485,6 +2489,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
/**
* Checks if a FlowFile is known in this session.
*
* @param flowFile the FlowFile to check
* @return <code>true</code> if the FlowFile is known in this session, <code>false</code> otherwise.
*/
boolean isFlowFileKnown(final FlowFile flowFile) {
return records.containsKey(flowFile);
}
@Override
public FlowFile create(final FlowFile parent) {
final Map<String, String> newAttributes = new HashMap<>(3);

Some files were not shown because too many files have changed in this diff Show More