NIFI-1808 Adding proper license information to Nar and provenance reporting to PublishMQTT

NIFI-1808 changing ConsumeMQTT messageArrived() logging

This closes #484
This commit is contained in:
jpercivall 2016-05-25 16:37:48 -04:00 committed by Oleg Zhurakousky
parent 893daf567d
commit f47af1ce83
7 changed files with 116 additions and 43 deletions

View File

@ -877,6 +877,7 @@ The following binary components are provided under the Eclipse Public License 1.
The following NOTICE information applies:
Copyright (c) 2007-2015 The JRuby project
(EPL 1.0) Eclipse Paho MQTT Client (org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2 - https://github.com/eclipse/paho.mqtt.java)
*****************
Mozilla Public License v2.0

View File

@ -0,0 +1,29 @@
nifi-mqtt-nar
Copyright 2014-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
************************
Eclipse Public License 1.0
************************
The following binary components are provided under the Eclipse Public License 1.0. See project link for details.
(EPL 1.0) Eclipse Paho MQTT Client (org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2 - https://github.com/eclipse/paho.mqtt.java)

View File

@ -13,21 +13,20 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mqtt-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-mqtt-processors</artifactId>
<packaging>jar</packaging>
<repositories>
<repository>
<id>Eclipse Paho Repo</id>
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
</repository>
</repositories>
<dependencies>
<!-- NiFi dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
@ -36,6 +35,35 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<!-- External dependencies -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.2</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
@ -52,41 +80,6 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.bytedeco</groupId>
<artifactId>javacv</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.mqtt;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnStopped;
@ -307,8 +308,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (logger.isInfoEnabled()) {
logger.info("MQTT message arrived on topic:" + topic);
if (logger.isDebugEnabled()) {
byte[] payload = message.getPayload();
String text = new String(payload, "UTF-8");
if (StringUtils.isAsciiPrintable(text)) {
logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text});
} else {
logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length});
}
}
if (mqttQueue.size() >= maxQueueSize){

View File

@ -38,6 +38,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@ -50,6 +51,7 @@ import java.util.List;
import java.util.Set;
import java.io.InputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -189,6 +191,7 @@ public class PublishMQTT extends AbstractMQTTProcessor {
try {
mqttClientConnectLock.readLock().lock();
final StopWatch stopWatch = new StopWatch(true);
try {
/*
* Underlying method waits for the message to publish (according to set QoS), so it executes synchronously:
@ -198,6 +201,8 @@ public class PublishMQTT extends AbstractMQTTProcessor {
} finally {
mqttClientConnectLock.readLock().unlock();
}
session.getProvenanceReporter().send(flowfile, broker, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowfile, REL_SUCCESS);
} catch(MqttException me) {
logger.error("Failed to publish message.", me);

View File

@ -21,6 +21,8 @@ import io.moquette.proto.messages.AbstractMessage;
import io.moquette.proto.messages.PublishMessage;
import io.moquette.server.Server;
import org.apache.nifi.processors.mqtt.ConsumeMQTT;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.eclipse.paho.client.mqttv3.IMqttClient;
@ -40,6 +42,8 @@ import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public abstract class TestConsumeMqttCommon {
@ -93,6 +97,7 @@ public abstract class TestConsumeMqttCommon {
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
@ -141,6 +146,7 @@ public abstract class TestConsumeMqttCommon {
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
@ -183,6 +189,7 @@ public abstract class TestConsumeMqttCommon {
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
assertProvenanceEvents(flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
@ -232,6 +239,7 @@ public abstract class TestConsumeMqttCommon {
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
assertProvenanceEvents(flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
@ -271,6 +279,7 @@ public abstract class TestConsumeMqttCommon {
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() < 2);
assertProvenanceEvents(flowFiles.size());
if(flowFiles.size() == 1) {
MockFlowFile flowFile = flowFiles.get(0);
@ -314,6 +323,7 @@ public abstract class TestConsumeMqttCommon {
consumeMQTT.onStopped(testRunner.getProcessContext());
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
@ -363,6 +373,7 @@ public abstract class TestConsumeMqttCommon {
testRunner.run(1);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
assertProvenanceEvents(2);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
@ -388,4 +399,13 @@ public abstract class TestConsumeMqttCommon {
method.setAccessible(true);
method.invoke(processor);
}
private void assertProvenanceEvents(int count){
List<ProvenanceEventRecord> provenanceEvents = testRunner.getProvenanceEvents();
assertNotNull(provenanceEvents);
assertEquals(count, provenanceEvents.size());
if (count > 0) {
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvents.get(0).getEventType());
}
}
}

View File

@ -19,11 +19,17 @@ package org.apache.nifi.processors.mqtt.common;
import io.moquette.server.Server;
import org.apache.nifi.processors.mqtt.PublishMQTT;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.TestRunner;
import org.junit.Test;
import java.util.List;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public abstract class TestPublishMqttCommon {
@ -47,6 +53,7 @@ public abstract class TestPublishMqttCommon {
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
assertProvenanceEvents();
verifyPublishedMessage(testMessage.getBytes(), 0, false);
}
@ -63,6 +70,7 @@ public abstract class TestPublishMqttCommon {
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvents();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 1, false);
@ -82,6 +90,7 @@ public abstract class TestPublishMqttCommon {
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvents();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, false);
@ -99,6 +108,7 @@ public abstract class TestPublishMqttCommon {
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvents();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, false);
@ -117,8 +127,16 @@ public abstract class TestPublishMqttCommon {
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvents();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, true);
}
private void assertProvenanceEvents(){
List<ProvenanceEventRecord> provenanceEvents = testRunner.getProvenanceEvents();
assertNotNull(provenanceEvents);
assertEquals(1, provenanceEvents.size());
assertEquals(ProvenanceEventType.SEND, provenanceEvents.get(0).getEventType());
}
}