This commit is contained in:
Matt Gilman 2015-05-19 22:07:15 -04:00
commit a66848b263
143 changed files with 1700 additions and 443 deletions

View File

@ -24,9 +24,11 @@ Apache NiFi is an easy to use, powerful, and reliable system to process and dist
## Getting Started
- Read through the [quickstart guide for development](http://nifi.incubator.apache.org/development/quickstart.html).
- Read through the [quickstart guide for development](http://nifi.incubator.apache.org/quickstart.html).
It will include information on getting a local copy of the source, give pointers on issue
tracking, and provide some warnings about common problems with development environments.
- For a more comprehensive guide to development and information about contributing to the project
read through the [NiFi Developer's Guide](http://nifi.incubator.apache.org/developer-guide.html).
- Optional: Build supporting modules. This should only be needed if the current 'nifi' module is in
the process of updating to a new version of either the 'nifi-parent' or 'nifi-nar-maven-plugin'
artifacts.

View File

@ -18,11 +18,11 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-parent</artifactId>
<version>1.0.0-incubating-SNAPSHOT</version>
<version>1.0.0-incubating</version>
<relativePath />
</parent>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.0.1-incubating-SNAPSHOT</version>
<version>1.0.2-incubating-SNAPSHOT</version>
<packaging>maven-plugin</packaging>
<description>Apache NiFi Nar Plugin. It is currently a part of the Apache Incubator.</description>
<build>

View File

@ -23,7 +23,7 @@
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-parent</artifactId>
<version>1.0.0-incubating-SNAPSHOT</version>
<version>1.0.1-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<description>The nifi-parent enables each apache nifi project to ensure consistent approaches and DRY</description>
<url>http://nifi.incubator.apache.org</url>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-api</artifactId>
<packaging>jar</packaging>

View File

@ -491,6 +491,10 @@ The following binary components are provided under the Apache Software License v
This product includes software developed by
Saxonica (http://www.saxonica.com/).
(ASLv2) MongoDB Java Driver
The following NOTICE information applies:
Copyright (C) 2008-2013 10gen, Inc.
(ASLv2) Parquet MR
The following NOTICE information applies:
Parquet MR

View File

@ -9,13 +9,12 @@ by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-assembly</artifactId>
<packaging>pom</packaging>
@ -163,6 +162,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-kite-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>
@ -171,25 +175,25 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-social-media-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hl7-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-language-translation-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geo-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-bootstrap</artifactId>
<packaging>jar</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-data-provenance-utils</artifactId>
<packaging>jar</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-expression-language</artifactId>
<packaging>jar</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-flowfile-packager</artifactId>
<packaging>jar</packaging>

View File

@ -13,14 +13,13 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hl7-query-language</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-logging-utils</artifactId>
<description>Utilities for logging</description>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-processor-utils</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-properties</artifactId>
</project>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-security-utils</artifactId>
<description>Contains security functionality.</description>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-site-to-site-client</artifactId>
@ -42,7 +42,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-client-dto</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-socket-utils</artifactId>
<description>Utilities for socket communication</description>

View File

@ -45,17 +45,19 @@ public final class ChannelDispatcher implements Runnable {
private final StreamConsumerFactory factory;
private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
private final long timeout;
private final boolean readSingleDatagram;
private volatile boolean stop = false;
public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service,
final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit) {
final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit, final boolean readSingleDatagram) {
this.serverSocketSelector = serverSocketSelector;
this.socketChannelSelector = socketChannelSelector;
this.executor = service;
this.factory = factory;
emptyBuffers = buffers;
this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
this.readSingleDatagram = readSingleDatagram;
}
public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) {
@ -136,7 +138,7 @@ public final class ChannelDispatcher implements Runnable {
// for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
// way to tell if it's new is the lack of an attachment.
if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory);
reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory, readSingleDatagram);
socketChannelKey.attach(reader);
final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
TimeUnit.MILLISECONDS);

View File

@ -75,14 +75,14 @@ public final class ChannelListener {
private volatile long channelReaderFrequencyMSecs = 50;
public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
TimeUnit unit) throws IOException {
TimeUnit unit, final boolean readSingleDatagram) throws IOException {
this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread
this.serverSocketSelector = Selector.open();
this.socketChannelSelector = Selector.open();
this.bufferPool = bufferPool;
this.initialBufferPoolSize = bufferPool.size();
channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool,
timeout, unit);
timeout, unit, readSingleDatagram);
executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
}

View File

@ -27,8 +27,12 @@ public final class DatagramChannelReader extends AbstractChannelReader {
public static final int MAX_UDP_PACKET_SIZE = 65507;
public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
private final boolean readSingleDatagram;
public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory,
final boolean readSingleDatagram) {
super(id, key, empties, consumerFactory);
this.readSingleDatagram = readSingleDatagram;
}
/**
@ -45,7 +49,7 @@ public final class DatagramChannelReader extends AbstractChannelReader {
final DatagramChannel dChannel = (DatagramChannel) key.channel();
final int initialBufferPosition = buffer.position();
while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
if (dChannel.receive(buffer) == null) {
if (dChannel.receive(buffer) == null || readSingleDatagram) {
break;
}
}

View File

@ -52,7 +52,7 @@ public final class ServerMain {
ChannelListener listener = null;
try {
executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS);
listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS);
listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS, false);
listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS);
listener.addDatagramChannel(null, 20000, 32 << 20);
LOGGER.info("Listening for UDP data on port 20000");

View File

@ -18,10 +18,10 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-utils</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<!--
This project intentionally has no additional dependencies beyond that pulled in by the parent. It is a general purpose utility library

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-web-utils</artifactId>
<dependencies>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-write-ahead-log</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<packaging>pom</packaging>
<artifactId>nifi-docs</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-external</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-receiver</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-external</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-maven-archetypes</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-processor-bundle-archetype</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-maven-archetypes</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-mock</artifactId>
<dependencies>

View File

@ -1,36 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-bundle</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-processors</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -1,71 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/hello.txt</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-bundle</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/hello.txt</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,43 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-aws-processors</module>
<module>nifi-aws-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.9.24</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-aws-processors</module>
<module>nifi-aws-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.9.24</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-nar</artifactId>
<packaging>nar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-administration</artifactId>
<build>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-client-dto</artifactId>
<dependencies>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-cluster-authorization-provider</artifactId>
<dependencies>

View File

@ -1,41 +1,39 @@
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF licenses
this file to You under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License. -->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-documentation</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF licenses
this file to You under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License. -->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-documentation</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-file-authorization-provider</artifactId>
<build>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-cluster-protocol</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-cluster-web</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-cluster</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-core-api</artifactId>
<dependencies>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-core</artifactId>
<packaging>jar</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-nar-utils</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-resources</artifactId>
<packaging>pom</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-runtime</artifactId>
<packaging>jar</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-security</artifactId>
<description>Contains security functionality common to NiFi.</description>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-site-to-site</artifactId>
<dependencies>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-user-actions</artifactId>
</project>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-custom-ui-utilities</artifactId>
<dependencies>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-jetty</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-ui-extension</artifactId>
<packaging>jar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-api</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-content-access</artifactId>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-content-viewer</artifactId>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-docs</artifactId>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-error</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-optimistic-locking</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-security</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-web-ui</artifactId>
<packaging>war</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-web</artifactId>
<packaging>pom</packaging>
@ -40,31 +40,31 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-api</artifactId>
<type>war</type>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-error</artifactId>
<type>war</type>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-docs</artifactId>
<type>war</type>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-content-viewer</artifactId>
<type>war</type>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-ui</artifactId>
<type>war</type>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework</artifactId>
<packaging>pom</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-bundle</artifactId>
<packaging>pom</packaging>
@ -31,92 +31,92 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster-protocol</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster-web</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-authorization-provider</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cluster-authorization-provider</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-runtime</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-client-dto</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-content-access</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core-api</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-user-actions</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-administration</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-jetty</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-optimistic-locking</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-security</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-documentation</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geo-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-geo-nar</artifactId>
<packaging>nar</packaging>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geo-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-geo-processors</artifactId>
<dependencies>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-geo-bundle</artifactId>
@ -35,7 +35,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geo-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hadoop-nar</artifactId>
<packaging>nar</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hdfs-processors</artifactId>
<packaging>jar</packaging>

View File

@ -42,6 +42,13 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.net.NetUtils;
/**
@ -60,6 +67,13 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
public static final String DIRECTORY_PROP_NAME = "Directory";
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("Compression codec")
.required(false)
.allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(),
GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName())
.build();
protected static final List<PropertyDescriptor> properties;
static {
@ -228,6 +242,23 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
};
}
/**
* Returns the configured CompressionCodec, or null if none is configured.
*
* @param context the ProcessContext
* @param configuration the Hadoop Configuration
* @return CompressionCodec or null
*/
protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
CompressionCodec codec = null;
if (context.getProperty(COMPRESSION_CODEC).isSet()) {
String compressionClassname = context.getProperty(COMPRESSION_CODEC).getValue();
CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
codec = ccf.getCodecByClassName(compressionClassname);
}
return codec;
}
/**
* Returns the relative path of the child that does not include the filename

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.hadoop;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -33,11 +34,11 @@ import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@ -192,6 +193,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
props.add(POLLING_INTERVAL);
props.add(BATCH_SIZE);
props.add(BUFFER_SIZE);
props.add(COMPRESSION_CODEC);
localProperties = Collections.unmodifiableList(props);
}
@ -329,7 +331,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
// process the batch of files
FSDataInputStream stream = null;
InputStream stream = null;
Configuration conf = getConfiguration();
FileSystem hdfs = getFileSystem();
final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
@ -337,6 +339,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue());
final CompressionCodec codec = getCompressionCodec(context, conf);
for (final Path file : files) {
try {
if (!hdfs.exists(file)) {
@ -346,6 +349,9 @@ public class GetHDFS extends AbstractHadoopProcessor {
final String relativePath = getPathDifference(rootDir, file);
stream = hdfs.open(file, bufferSize);
if (codec != null) {
stream = codec.createInputStream(stream);
}
FlowFile flowFile = session.create();
final StopWatch stopWatch = new StopWatch(true);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -27,10 +28,10 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -157,6 +158,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
props.add(UMASK);
props.add(REMOTE_OWNER);
props.add(REMOTE_GROUP);
props.add(COMPRESSION_CODEC);
localProperties = Collections.unmodifiableList(props);
}
@ -215,6 +217,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
.getDefaultReplication(configuredRootDirPath);
final CompressionCodec codec = getCompressionCodec(context, configuration);
Path tempDotCopyFile = null;
try {
final Path tempCopyFile;
@ -266,10 +270,13 @@ public class PutHDFS extends AbstractHadoopProcessor {
@Override
public void process(InputStream in) throws IOException {
FSDataOutputStream fos = null;
OutputStream fos = null;
Path createdFile = null;
try {
fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
if (codec != null) {
fos = codec.createOutputStream(fos);
}
createdFile = tempCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);

View File

@ -19,6 +19,8 @@ package org.apache.nifi.processors.hadoop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@ -30,8 +32,9 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.junit.Assert;
import org.junit.Test;
@ -105,6 +108,19 @@ public class GetHDFSTest {
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Minimum File Age cannot be greater than Maximum File Age"));
}
results = new HashSet<>();
runner.setProperty(GetHDFS.DIRECTORY, "/target");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set"));
}
}
@Test
@ -115,9 +131,25 @@ public class GetHDFSTest {
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(3, flowFiles.size());
assertEquals(4, flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("random"));
}
}
@Test
public void testGetFilesWithCompression() throws IOException {
TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, GzipCodec.class.getName());
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("randombytes-1.gz"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1");
flowFile.assertContentEquals(expected);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.hadoop;
import org.apache.nifi.processors.hadoop.PutHDFS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -33,10 +34,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@ -136,6 +137,20 @@ public class PutHDFSTest {
for (ValidationResult vr : results) {
assertTrue(vr.toString().contains("is invalid because octal umask [2000] is not a valid umask"));
}
results = new HashSet<>();
runner = TestRunners.newTestRunner(PutHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "/target");
runner.setProperty(PutHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set"));
}
}
// The following only seems to work from cygwin...something about not finding the 'chmod' command.

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hadoop-bundle</artifactId>
<packaging>pom</packaging>
@ -31,7 +31,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hdfs-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-libraries-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hadoop-libraries-nar</artifactId>
<packaging>nar</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-libraries-bundle</artifactId>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hl7-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hl7-nar</artifactId>
@ -29,7 +29,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hl7-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hl7-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hl7-processors</artifactId>
@ -52,7 +52,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hl7-query-language</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hl7-bundle</artifactId>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-jetty-bundle</artifactId>
<packaging>nar</packaging>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-kafka-nar</artifactId>
<packaging>nar</packaging>

View File

@ -16,7 +16,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-kafka-processors</artifactId>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-kafka-bundle</artifactId>
<packaging>pom</packaging>
@ -30,7 +30,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kite-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-kite-nar</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kite-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-kite-processors</artifactId>

View File

@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-kite-bundle</artifactId>
@ -36,7 +36,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kite-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-language-translation-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-language-translation-nar</artifactId>
@ -29,7 +29,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-yandex-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-language-translation-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-yandex-processors</artifactId>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-language-translation-bundle</artifactId>

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-bundle</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-mongodb-nar</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-processors</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-bundle</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-mongodb-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.github.joelittlejohn.embedmongo</groupId>
<artifactId>embedmongo-maven-plugin</artifactId>
<version>0.1.12</version>
<executions>
<execution>
<id>start</id>
<goals>
<goal>start</goal>
</goals>
<phase>test-compile</phase>
<configuration>
<databaseDirectory>${project.build.directory}/embedmongo/db</databaseDirectory>
<logging>file</logging>
<logFile>${project.build.directory}/embedmongo.log</logFile>
</configuration>
</execution>
<execution>
<id>stop</id>
<goals>
<goal>stop</goal>
</goals>
<phase>prepare-package</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import java.io.IOException;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
public abstract class AbstractMongoProcessor extends AbstractProcessor {
protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
.name("Mongo URI")
.description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
.name("Mongo Database Name")
.description("The name of the database to use")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
.name("Mongo Collection Name")
.description("The name of the collection to use")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected MongoClient mongoClient;
@OnScheduled
public final void createClient(ProcessContext context) throws IOException {
if (mongoClient != null) {
closeClient();
}
getLogger().info("Creating MongoClient");
try {
final String uri = context.getProperty(URI).getValue();
mongoClient = new MongoClient(new MongoClientURI(uri));
} catch (Exception e) {
getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e);
throw e;
}
}
@OnStopped
public final void closeClient() {
if (mongoClient != null) {
getLogger().info("Closing MongoClient");
mongoClient.close();
mongoClient = null;
}
}
protected MongoDatabase getDatabase(final ProcessContext context) {
final String databaseName = context.getProperty(DATABASE_NAME).getValue();
return mongoClient.getDatabase(databaseName);
}
protected MongoCollection<Document> getCollection(final ProcessContext context) {
final String collectionName = context.getProperty(COLLECTION_NAME).getValue();
return getDatabase(context).getCollection(collectionName);
}
}

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
@Tags({ "mongodb", "read", "get" })
@CapabilityDescription("Creates FlowFiles from documents in MongoDB")
public class GetMongo extends AbstractMongoProcessor {
public static final Validator DOCUMENT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
String reason = null;
try {
Document.parse(value);
} catch (final RuntimeException e) {
reason = e.getClass().getName();
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}
};
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Query")
.description("The selection criteria; must be a valid BSON document; if omitted the entire collection will be queried")
.required(false)
.addValidator(DOCUMENT_VALIDATOR)
.build();
static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
.name("Projection")
.description("The fields to be returned from the documents in the result set; must be a valid BSON document")
.required(false)
.addValidator(DOCUMENT_VALIDATOR)
.build();
static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
.name("Sort")
.description("The fields by which to sort; must be a valid BSON document")
.required(false)
.addValidator(DOCUMENT_VALIDATOR)
.build();
static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
.name("Limit")
.description("The maximum number of elements to return")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of elements returned from the server in one batch")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
private final List<PropertyDescriptor> descriptors;
private final Set<Relationship> relationships;
public GetMongo() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(URI);
descriptors.add(DATABASE_NAME);
descriptors.add(COLLECTION_NAME);
descriptors.add(QUERY);
descriptors.add(PROJECTION);
descriptors.add(SORT);
descriptors.add(LIMIT);
descriptors.add(BATCH_SIZE);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
final Document query = context.getProperty(QUERY).isSet() ? Document.parse(context.getProperty(QUERY).getValue()) : null;
final Document projection = context.getProperty(PROJECTION).isSet() ? Document.parse(context.getProperty(PROJECTION).getValue()) : null;
final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).getValue()) : null;
final MongoCollection<Document> collection = getCollection(context);
try {
final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
if (projection != null) {
it.projection(projection);
}
if (sort != null) {
it.sort(sort);
}
if (context.getProperty(LIMIT).isSet()) {
it.limit(context.getProperty(LIMIT).asInteger());
}
if (context.getProperty(BATCH_SIZE).isSet()) {
it.batchSize(context.getProperty(BATCH_SIZE).asInteger());
}
final MongoCursor<Document> cursor = it.iterator();
try {
FlowFile flowFile = null;
while (cursor.hasNext()) {
flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
IOUtils.write(cursor.next().toJson(), out);
}
});
session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue());
session.transfer(flowFile, REL_SUCCESS);
}
session.commit();
} finally {
cursor.close();
}
} catch (final RuntimeException e) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e);
}
}
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mongodb;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.bson.Document;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
@EventDriven
@Tags({ "mongodb", "insert", "update", "write", "put" })
@CapabilityDescription("Writes the contents of a FlowFile to MongoDB")
public class PutMongo extends AbstractMongoProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
static final String MODE_INSERT = "insert";
static final String MODE_UPDATE = "update";
static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
static final String WRITE_CONCERN_FSYNCED = "FSYNCED";
static final String WRITE_CONCERN_JOURNALED = "JOURNALED";
static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
static final String WRITE_CONCERN_MAJORITY = "MAJORITY";
static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Indicates whether the processor should insert or update content")
.required(true)
.allowableValues(MODE_INSERT, MODE_UPDATE)
.defaultValue(MODE_INSERT)
.build();
static final PropertyDescriptor UPSERT = new PropertyDescriptor.Builder()
.name("Upsert")
.description("When true, inserts a document if no document matches the update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored")
.required(true)
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();
static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder()
.name("Update Query Key")
.description("Key name used to build the update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("_id")
.build();
static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
.name("Write Concern")
.description("The write concern to use")
.required(true)
.allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
.defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
.build();
static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the data is encoded")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
private final List<PropertyDescriptor> descriptors;
private final Set<Relationship> relationships;
public PutMongo() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(URI);
descriptors.add(DATABASE_NAME);
descriptors.add(COLLECTION_NAME);
descriptors.add(MODE);
descriptors.add(UPSERT);
descriptors.add(UPDATE_QUERY_KEY);
descriptors.add(WRITE_CONCERN);
descriptors.add(CHARACTER_SET);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final String mode = context.getProperty(MODE).getValue();
final WriteConcern writeConcern = getWriteConcern(context);
final MongoCollection<Document> collection = getCollection(context).withWriteConcern(writeConcern);
try {
// Read the contents of the FlowFile into a byte array
final byte[] content = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, content, true);
}
});
// parse
final Document doc = Document.parse(new String(content, charset));
if (MODE_INSERT.equalsIgnoreCase(mode)) {
collection.insertOne(doc);
logger.info("inserted {} into MongoDB", new Object[] { flowFile });
} else {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue();
final Document query = new Document(updateKey, doc.get(updateKey));
collection.replaceOne(query, doc, new UpdateOptions().upsert(upsert));
logger.info("updated {} into MongoDB", new Object[] { flowFile });
}
session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue());
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
logger.error("Failed to insert {} into MongoDB due to {}", new Object[] {flowFile, e}, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
protected WriteConcern getWriteConcern(final ProcessContext context) {
final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
WriteConcern writeConcern = null;
switch (writeConcernProperty) {
case WRITE_CONCERN_ACKNOWLEDGED:
writeConcern = WriteConcern.ACKNOWLEDGED;
break;
case WRITE_CONCERN_UNACKNOWLEDGED:
writeConcern = WriteConcern.UNACKNOWLEDGED;
break;
case WRITE_CONCERN_FSYNCED:
writeConcern = WriteConcern.FSYNCED;
break;
case WRITE_CONCERN_JOURNALED:
writeConcern = WriteConcern.JOURNALED;
break;
case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
break;
case WRITE_CONCERN_MAJORITY:
writeConcern = WriteConcern.MAJORITY;
break;
default:
writeConcern = WriteConcern.ACKNOWLEDGED;
}
return writeConcern;
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.mongodb.GetMongo
org.apache.nifi.processors.mongodb.PutMongo

View File

@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
public class GetMongoTest {
private static final String MONGO_URI = "mongodb://localhost";
private static final String DB_NAME = GetMongoTest.class.getSimpleName().toLowerCase();
private static final String COLLECTION_NAME = "test";
private static final List<Document> DOCUMENTS = Lists.newArrayList(
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4),
new Document("_id", "doc_3").append("a", 1).append("b", 3)
);
private TestRunner runner;
private MongoClient mongoClient;
@Before
public void setup() {
runner = TestRunners.newTestRunner(GetMongo.class);
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
collection.insertMany(DOCUMENTS);
}
@After
public void teardown() {
runner = null;
mongoClient.getDatabase(DB_NAME).drop();
}
@Test
public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(GetMongo.class);
Collection<ValidationResult> results;
ProcessContext pc;
// missing uri, db, collection
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(3, results.size());
Iterator<ValidationResult> it = results.iterator();
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
// missing query - is ok
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(0, results.size());
// invalid query
runner.setProperty(GetMongo.QUERY, "{a: x,y,z}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is invalid because org.bson.json.JsonParseException"));
// invalid projection
runner.setProperty(GetMongo.QUERY, "{a: 1}");
runner.setProperty(GetMongo.PROJECTION, "{a: x,y,z}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Projection' .* is invalid because org.bson.json.JsonParseException"));
// invalid sort
runner.removeProperty(GetMongo.PROJECTION);
runner.setProperty(GetMongo.SORT, "{a: x,y,z}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Sort' .* is invalid because org.bson.json.JsonParseException"));
}
@Test
public void testReadOneDocument() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson());
}
@Test
public void testReadMultipleDocuments() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
}
}
@Test
public void testProjection() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}");
runner.setProperty(GetMongo.PROJECTION, "{_id: 0, a: 1}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
Document expected = new Document("a", 1);
flowFiles.get(0).assertContentEquals(expected.toJson());
}
@Test
public void testSort() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
runner.setProperty(GetMongo.SORT, "{a: -1, b: -1, c: 1}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson());
flowFiles.get(1).assertContentEquals(DOCUMENTS.get(0).toJson());
flowFiles.get(2).assertContentEquals(DOCUMENTS.get(1).toJson());
}
@Test
public void testLimit() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
runner.setProperty(GetMongo.LIMIT, "1");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson());
}
}

View File

@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mongodb;
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertEquals;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
public class PutMongoTest {
private static final String MONGO_URI = "mongodb://localhost";
private static final String DATABASE_NAME = PutMongoTest.class.getSimpleName().toLowerCase();
private static final String COLLECTION_NAME = "test";
private static final List<Document> DOCUMENTS = Lists.newArrayList(
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4),
new Document("_id", "doc_3").append("a", 1).append("b", 3)
);
private TestRunner runner;
private MongoClient mongoClient;
private MongoCollection<Document> collection;
@Before
public void setup() {
runner = TestRunners.newTestRunner(PutMongo.class);
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME);
}
@After
public void teardown() {
runner = null;
mongoClient.getDatabase(DATABASE_NAME).drop();
}
private byte[] documentToByteArray(Document doc) {
return doc.toJson().getBytes(UTF_8);
}
@Test
public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(PutMongo.class);
Collection<ValidationResult> results;
ProcessContext pc;
// missing uri, db, collection
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(3, results.size());
Iterator<ValidationResult> it = results.iterator();
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
// invalid write concern
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
runner.setProperty(PutMongo.WRITE_CONCERN, "xyz");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Write Concern' .* is invalid because Given value not found in allowed set .*"));
// valid write concern
runner.setProperty(PutMongo.WRITE_CONCERN, PutMongo.WRITE_CONCERN_UNACKNOWLEDGED);
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(0, results.size());
}
@Test
public void testInsertOne() throws Exception {
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
@Test
public void testInsertMany() throws Exception {
for (Document doc : DOCUMENTS) {
runner.enqueue(documentToByteArray(doc));
}
runner.run(3);
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
}
// verify 3 docs inserted into the collection
assertEquals(3, collection.count());
}
@Test
public void testInsertWithDuplicateKey() throws Exception {
// pre-insert one document
collection.insertOne(DOCUMENTS.get(0));
for (Document doc : DOCUMENTS) {
runner.enqueue(documentToByteArray(doc));
}
runner.run(3);
// first doc failed, other 2 succeeded
runner.assertTransferCount(PutMongo.REL_FAILURE, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_FAILURE).get(0);
out.assertContentEquals(documentToByteArray(DOCUMENTS.get(0)));
runner.assertTransferCount(PutMongo.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i+1).toJson());
}
// verify 2 docs inserted into the collection for a total of 3
assertEquals(3, collection.count());
}
/**
* Verifies that 'update' does not insert if 'upsert' if false.
* @see #testUpsert()
*/
@Test
public void testUpdateDoesNotInsert() throws Exception {
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
runner.setProperty(PutMongo.MODE, "update");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// nothing was in collection, so nothing to update since upsert defaults to false
assertEquals(0, collection.count());
}
/**
* Verifies that 'update' does insert if 'upsert' is true.
* @see #testUpdateDoesNotInsert()
*/
@Test
public void testUpsert() throws Exception {
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
@Test
public void testUpdate() throws Exception {
Document doc = DOCUMENTS.get(0);
// pre-insert document
collection.insertOne(doc);
// modify the object
doc.put("abc", "123");
doc.put("xyz", "456");
doc.remove("c");
byte[] bytes = documentToByteArray(doc);
runner.setProperty(PutMongo.MODE, "update");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
}

Some files were not shown because too many files have changed in this diff Show More