From f47af1ce8336c9305916f00738976f3505b01b0b Mon Sep 17 00:00:00 2001 From: jpercivall Date: Wed, 25 May 2016 16:37:48 -0400 Subject: [PATCH] NIFI-1808 Adding proper license information to Nar and provenance reporting to PublishMQTT NIFI-1808 changing ConsumeMQTT messageArrived() logging This closes #484 --- nifi-assembly/NOTICE | 1 + .../src/main/resources/META-INF/NOTICE | 29 +++++++ .../nifi-mqtt-processors/pom.xml | 75 +++++++++---------- .../nifi/processors/mqtt/ConsumeMQTT.java | 11 ++- .../nifi/processors/mqtt/PublishMQTT.java | 5 ++ .../mqtt/common/TestConsumeMqttCommon.java | 20 +++++ .../mqtt/common/TestPublishMqttCommon.java | 18 +++++ 7 files changed, 116 insertions(+), 43 deletions(-) create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 65b7b293ba..bfdb0bdf58 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -877,6 +877,7 @@ The following binary components are provided under the Eclipse Public License 1. The following NOTICE information applies: Copyright (c) 2007-2015 The JRuby project + (EPL 1.0) Eclipse Paho MQTT Client (org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2 - https://github.com/eclipse/paho.mqtt.java) ***************** Mozilla Public License v2.0 diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..fe68c308a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,29 @@ +nifi-mqtt-nar +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + + +************************ +Eclipse Public License 1.0 +************************ + +The following binary components are provided under the Eclipse Public License 1.0. See project link for details. + + (EPL 1.0) Eclipse Paho MQTT Client (org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2 - https://github.com/eclipse/paho.mqtt.java) \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml index 9561517f36..db1bd71429 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml @@ -13,21 +13,20 @@ See the License for the specific language governing permissions and limitations under the License. --> + org.apache.nifi nifi-mqtt-bundle 1.0.0-SNAPSHOT + 4.0.0 nifi-mqtt-processors jar - - - Eclipse Paho Repo - https://repo.eclipse.org/content/repositories/paho-releases/ - - + + + org.apache.nifi nifi-api @@ -36,6 +35,35 @@ org.apache.nifi nifi-processor-utils + + org.apache.nifi + nifi-ssl-context-service-api + + + + + org.apache.commons + commons-io + 1.3.2 + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.0.2 + + + + + io.moquette + moquette-broker + 0.8.1 + test + + + org.apache.nifi + nifi-ssl-context-service + test + org.apache.nifi nifi-mock @@ -52,41 +80,6 @@ 4.11 test - - org.apache.commons - commons-io - 1.3.2 - - - org.bytedeco - javacv - 1.1 - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.0.2 - - - javax.websocket - javax.websocket-api - 1.1 - - - org.apache.nifi - nifi-ssl-context-service-api - - - io.moquette - moquette-broker - 0.8.1 - test - - - org.apache.nifi - nifi-ssl-context-service - test - diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java index aa87381cd8..ff70f7fc0b 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.mqtt; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.lifecycle.OnStopped; @@ -307,8 +308,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - if (logger.isInfoEnabled()) { - logger.info("MQTT message arrived on topic:" + topic); + if (logger.isDebugEnabled()) { + byte[] payload = message.getPayload(); + String text = new String(payload, "UTF-8"); + if (StringUtils.isAsciiPrintable(text)) { + logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text}); + } else { + logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length}); + } } if (mqttQueue.size() >= maxQueueSize){ diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java index 95bbde4d4f..a6085c4ced 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java @@ -38,6 +38,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -50,6 +51,7 @@ import java.util.List; import java.util.Set; import java.io.InputStream; import java.io.IOException; +import java.util.concurrent.TimeUnit; @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @@ -189,6 +191,7 @@ public class PublishMQTT extends AbstractMQTTProcessor { try { mqttClientConnectLock.readLock().lock(); + final StopWatch stopWatch = new StopWatch(true); try { /* * Underlying method waits for the message to publish (according to set QoS), so it executes synchronously: @@ -198,6 +201,8 @@ public class PublishMQTT extends AbstractMQTTProcessor { } finally { mqttClientConnectLock.readLock().unlock(); } + + session.getProvenanceReporter().send(flowfile, broker, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowfile, REL_SUCCESS); } catch(MqttException me) { logger.error("Failed to publish message.", me); diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java index c3f1b3d775..d010d1d7c8 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java @@ -21,6 +21,8 @@ import io.moquette.proto.messages.AbstractMessage; import io.moquette.proto.messages.PublishMessage; import io.moquette.server.Server; import org.apache.nifi.processors.mqtt.ConsumeMQTT; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.eclipse.paho.client.mqttv3.IMqttClient; @@ -40,6 +42,8 @@ import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_ import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public abstract class TestConsumeMqttCommon { @@ -93,6 +97,7 @@ public abstract class TestConsumeMqttCommon { testRunner.run(1, false, false); testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + assertProvenanceEvents(1); List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); MockFlowFile flowFile = flowFiles.get(0); @@ -141,6 +146,7 @@ public abstract class TestConsumeMqttCommon { testRunner.run(1, false, false); testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + assertProvenanceEvents(1); List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); MockFlowFile flowFile = flowFiles.get(0); @@ -183,6 +189,7 @@ public abstract class TestConsumeMqttCommon { List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); assertTrue(flowFiles.size() > 0); + assertProvenanceEvents(flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); flowFile.assertContentEquals("testMessage"); @@ -232,6 +239,7 @@ public abstract class TestConsumeMqttCommon { List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); assertTrue(flowFiles.size() > 0); + assertProvenanceEvents(flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); flowFile.assertContentEquals("testMessage"); @@ -271,6 +279,7 @@ public abstract class TestConsumeMqttCommon { List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); assertTrue(flowFiles.size() < 2); + assertProvenanceEvents(flowFiles.size()); if(flowFiles.size() == 1) { MockFlowFile flowFile = flowFiles.get(0); @@ -314,6 +323,7 @@ public abstract class TestConsumeMqttCommon { consumeMQTT.onStopped(testRunner.getProcessContext()); testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + assertProvenanceEvents(1); List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); MockFlowFile flowFile = flowFiles.get(0); @@ -363,6 +373,7 @@ public abstract class TestConsumeMqttCommon { testRunner.run(1); testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2); + assertProvenanceEvents(2); List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); MockFlowFile flowFile = flowFiles.get(0); @@ -388,4 +399,13 @@ public abstract class TestConsumeMqttCommon { method.setAccessible(true); method.invoke(processor); } + + private void assertProvenanceEvents(int count){ + List provenanceEvents = testRunner.getProvenanceEvents(); + assertNotNull(provenanceEvents); + assertEquals(count, provenanceEvents.size()); + if (count > 0) { + assertEquals(ProvenanceEventType.RECEIVE, provenanceEvents.get(0).getEventType()); + } + } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java index 75df6f3cb6..fc560af300 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java @@ -19,11 +19,17 @@ package org.apache.nifi.processors.mqtt.common; import io.moquette.server.Server; import org.apache.nifi.processors.mqtt.PublishMQTT; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.TestRunner; import org.junit.Test; +import java.util.List; + import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public abstract class TestPublishMqttCommon { @@ -47,6 +53,7 @@ public abstract class TestPublishMqttCommon { testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); testRunner.assertTransferCount(REL_SUCCESS, 1); + assertProvenanceEvents(); verifyPublishedMessage(testMessage.getBytes(), 0, false); } @@ -63,6 +70,7 @@ public abstract class TestPublishMqttCommon { testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + assertProvenanceEvents(); testRunner.assertTransferCount(REL_SUCCESS, 1); verifyPublishedMessage(testMessage.getBytes(), 1, false); @@ -82,6 +90,7 @@ public abstract class TestPublishMqttCommon { testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + assertProvenanceEvents(); testRunner.assertTransferCount(REL_SUCCESS, 1); verifyPublishedMessage(testMessage.getBytes(), 2, false); @@ -99,6 +108,7 @@ public abstract class TestPublishMqttCommon { testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + assertProvenanceEvents(); testRunner.assertTransferCount(REL_SUCCESS, 1); verifyPublishedMessage(testMessage.getBytes(), 2, false); @@ -117,8 +127,16 @@ public abstract class TestPublishMqttCommon { testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + assertProvenanceEvents(); testRunner.assertTransferCount(REL_SUCCESS, 1); verifyPublishedMessage(testMessage.getBytes(), 2, true); } + + private void assertProvenanceEvents(){ + List provenanceEvents = testRunner.getProvenanceEvents(); + assertNotNull(provenanceEvents); + assertEquals(1, provenanceEvents.size()); + assertEquals(ProvenanceEventType.SEND, provenanceEvents.get(0).getEventType()); + } }