mirror of https://github.com/apache/nifi.git
parent
e1b654655a
commit
7a4c71fec7
|
@ -317,11 +317,16 @@ language governing permissions and limitations under the License. -->
|
||||||
<artifactId>nifi-hive-nar</artifactId>
|
<artifactId>nifi-hive-nar</artifactId>
|
||||||
<type>nar</type>
|
<type>nar</type>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-site-to-site-reporting-nar</artifactId>
|
<artifactId>nifi-site-to-site-reporting-nar</artifactId>
|
||||||
<type>nar</type>
|
<type>nar</type>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mqtt-nar</artifactId>
|
||||||
|
<type>nar</type>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mqtt-bundle</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<artifactId>nifi-mqtt-nar</artifactId>
|
||||||
|
<packaging>nar</packaging>
|
||||||
|
<description>NiFi NAR for interacting with MQTT brokers</description>
|
||||||
|
<properties>
|
||||||
|
<maven.javadoc.skip>true</maven.javadoc.skip>
|
||||||
|
<source.skip>true</source.skip>
|
||||||
|
</properties>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mqtt-processors</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,86 @@
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mqtt-bundle</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>nifi-mqtt-processors</artifactId>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
<repositories>
|
||||||
|
<repository>
|
||||||
|
<id>Eclipse Paho Repo</id>
|
||||||
|
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
|
||||||
|
</repository>
|
||||||
|
</repositories>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-processor-utils</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mock</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-simple</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<version>4.11</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-io</artifactId>
|
||||||
|
<version>1.3.2</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.bytedeco</groupId>
|
||||||
|
<artifactId>javacv</artifactId>
|
||||||
|
<version>1.1</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.paho</groupId>
|
||||||
|
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||||
|
<version>1.0.2</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>javax.websocket</groupId>
|
||||||
|
<artifactId>javax.websocket-api</artifactId>
|
||||||
|
<version>1.1</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.glassfish.tyrus.bundles</groupId>
|
||||||
|
<artifactId>tyrus-standalone-client-jdk</artifactId>
|
||||||
|
<version>1.12</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.glassfish.tyrus</groupId>
|
||||||
|
<artifactId>tyrus-container-grizzly-client</artifactId>
|
||||||
|
<version>1.12</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,191 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.mqtt;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.*;
|
||||||
|
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.ReadsAttributes;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||||
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@Tags({"GetMQTT"})
|
||||||
|
@CapabilityDescription("Gets messages from an MQTT broker")
|
||||||
|
@SeeAlso({})
|
||||||
|
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
|
||||||
|
@WritesAttributes({@WritesAttribute(attribute="broker", description="MQTT broker that was the message source"),
|
||||||
|
@WritesAttribute(attribute="topic", description="MQTT topic on which message was received")})
|
||||||
|
public class GetMQTT extends AbstractProcessor implements MqttCallback {
|
||||||
|
|
||||||
|
String topic;
|
||||||
|
String broker;
|
||||||
|
String clientID;
|
||||||
|
double lastTime;
|
||||||
|
boolean firstTime = true;
|
||||||
|
|
||||||
|
MemoryPersistence persistence = new MemoryPersistence();
|
||||||
|
MqttClient mqttClient;
|
||||||
|
|
||||||
|
LinkedBlockingQueue<MQTTQueueMessage> mqttQueue = new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor
|
||||||
|
.Builder().name("Broker address")
|
||||||
|
.description("MQTT broker address (e.g. tcp://localhost:1883)")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROPERTY_MQTT_TOPIC = new PropertyDescriptor
|
||||||
|
.Builder().name("MQTT topic")
|
||||||
|
.description("MQTT topic to subscribe to")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor
|
||||||
|
.Builder().name("MQTT client ID")
|
||||||
|
.description("MQTT client ID to use")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship RELATIONSHIP_MQTTMESSAGE = new Relationship.Builder()
|
||||||
|
.name("MQTTMessage")
|
||||||
|
.description("MQTT message output")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private List<PropertyDescriptor> descriptors;
|
||||||
|
|
||||||
|
private Set<Relationship> relationships;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionLost(Throwable t) {
|
||||||
|
getLogger().info("Connection to " + broker + " lost");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||||
|
mqttQueue.add(new MQTTQueueMessage(topic, message.getPayload()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
|
||||||
|
descriptors.add(PROPERTY_BROKER_ADDRESS);
|
||||||
|
descriptors.add(PROPERTY_MQTT_TOPIC);
|
||||||
|
descriptors.add(PROPERTY_MQTT_CLIENTID);
|
||||||
|
|
||||||
|
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||||
|
|
||||||
|
final Set<Relationship> relationships = new HashSet<Relationship>();
|
||||||
|
relationships.add(RELATIONSHIP_MQTTMESSAGE);
|
||||||
|
this.relationships = Collections.unmodifiableSet(relationships);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return this.relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return descriptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnScheduled
|
||||||
|
public void onScheduled(final ProcessContext context) {
|
||||||
|
try {
|
||||||
|
broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue();
|
||||||
|
topic = context.getProperty(PROPERTY_MQTT_TOPIC).getValue();
|
||||||
|
clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue();
|
||||||
|
mqttClient = new MqttClient(broker, clientID, persistence);
|
||||||
|
MqttConnectOptions connOpts = new MqttConnectOptions();
|
||||||
|
mqttClient.setCallback(this);
|
||||||
|
connOpts.setCleanSession(true);
|
||||||
|
getLogger().info("Connecting to broker: " + broker);
|
||||||
|
mqttClient.connect(connOpts);
|
||||||
|
mqttClient.subscribe(topic, 0);
|
||||||
|
} catch(MqttException me) {
|
||||||
|
getLogger().error("msg "+me.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnUnscheduled
|
||||||
|
public void onUnscheduled(final ProcessContext context) {
|
||||||
|
try {
|
||||||
|
mqttClient.disconnect();
|
||||||
|
} catch(MqttException me) {
|
||||||
|
|
||||||
|
}
|
||||||
|
getLogger().error("Disconnected");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
final List messageList = new LinkedList();
|
||||||
|
|
||||||
|
mqttQueue.drainTo(messageList);
|
||||||
|
if (messageList.isEmpty())
|
||||||
|
return;
|
||||||
|
|
||||||
|
Iterator iterator = messageList.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
FlowFile messageFlowfile = session.create();
|
||||||
|
final MQTTQueueMessage m = (MQTTQueueMessage)iterator.next();
|
||||||
|
|
||||||
|
messageFlowfile = session.putAttribute(messageFlowfile, "broker", broker);
|
||||||
|
messageFlowfile = session.putAttribute(messageFlowfile, "topic", topic);
|
||||||
|
messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(final OutputStream out) throws IOException {
|
||||||
|
out.write(m.message);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
session.transfer(messageFlowfile, RELATIONSHIP_MQTTMESSAGE);
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.mqtt;
|
||||||
|
|
||||||
|
public class MQTTQueueMessage
|
||||||
|
{
|
||||||
|
public String topic;
|
||||||
|
public byte[] message;
|
||||||
|
|
||||||
|
public MQTTQueueMessage(String topic, byte[] message) {
|
||||||
|
this.topic = topic;
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,193 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.mqtt;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.*;
|
||||||
|
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.ReadsAttributes;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||||
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
|
@Tags({"PutMQTT"})
|
||||||
|
@CapabilityDescription("Publishes message to an MQTT topic")
|
||||||
|
@SeeAlso({})
|
||||||
|
@ReadsAttributes({@ReadsAttribute(attribute="topic", description="Topic to publish message to")})
|
||||||
|
@WritesAttributes({@WritesAttribute(attribute="", description="")})
|
||||||
|
public class PutMQTT extends AbstractProcessor implements MqttCallback {
|
||||||
|
|
||||||
|
String broker;
|
||||||
|
String clientID;
|
||||||
|
|
||||||
|
MemoryPersistence persistence = new MemoryPersistence();
|
||||||
|
MqttClient mqttClient;
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor
|
||||||
|
.Builder().name("Broker address")
|
||||||
|
.description("MQTT broker address (e.g. tcp://localhost:1883)")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor
|
||||||
|
.Builder().name("MQTT client ID")
|
||||||
|
.description("MQTT client ID to use")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private List<PropertyDescriptor> descriptors;
|
||||||
|
|
||||||
|
private Set<Relationship> relationships;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionLost(Throwable t) {
|
||||||
|
getLogger().info("Connection to " + broker + " lost");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
|
||||||
|
descriptors.add(PROPERTY_BROKER_ADDRESS);
|
||||||
|
descriptors.add(PROPERTY_MQTT_CLIENTID);
|
||||||
|
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||||
|
|
||||||
|
final Set<Relationship> relationships = new HashSet<Relationship>();
|
||||||
|
this.relationships = Collections.unmodifiableSet(relationships);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return this.relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return descriptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnScheduled
|
||||||
|
public void onScheduled(final ProcessContext context) {
|
||||||
|
try {
|
||||||
|
broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue();
|
||||||
|
clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue();
|
||||||
|
mqttClient = new MqttClient(broker, clientID, persistence);
|
||||||
|
MqttConnectOptions connOpts = new MqttConnectOptions();
|
||||||
|
mqttClient.setCallback(this);
|
||||||
|
connOpts.setCleanSession(true);
|
||||||
|
getLogger().info("Connecting to broker: " + broker);
|
||||||
|
mqttClient.connect(connOpts);
|
||||||
|
} catch(MqttException me) {
|
||||||
|
getLogger().error("msg "+me.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnUnscheduled
|
||||||
|
public void onUnscheduled(final ProcessContext context) {
|
||||||
|
try {
|
||||||
|
mqttClient.disconnect();
|
||||||
|
} catch(MqttException me) {
|
||||||
|
|
||||||
|
}
|
||||||
|
getLogger().error("Disconnected");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
final AtomicReference<String> message = new AtomicReference<>();
|
||||||
|
|
||||||
|
FlowFile flowfile = session.get();
|
||||||
|
message.set("");
|
||||||
|
|
||||||
|
// get the MQTT topic
|
||||||
|
|
||||||
|
String topic = flowfile.getAttribute("topic");
|
||||||
|
|
||||||
|
if (topic == null) {
|
||||||
|
getLogger().error("No topic attribute on flowfile");
|
||||||
|
session.remove(flowfile);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do the read
|
||||||
|
|
||||||
|
session.read(flowfile, new InputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(InputStream in) throws IOException {
|
||||||
|
try{
|
||||||
|
message.set(IOUtils.toString(in));
|
||||||
|
}catch(Exception e){
|
||||||
|
getLogger().error("Failed to read flowfile " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
session.remove(flowfile);
|
||||||
|
} catch (Exception e) {
|
||||||
|
getLogger().error("Failed to remove flowfile " + e.getMessage());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String output = message.get();
|
||||||
|
|
||||||
|
if ((output == null) || output.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
mqttClient.publish(topic, output.getBytes(), 0, false);
|
||||||
|
} catch(MqttException me) {
|
||||||
|
getLogger().error("msg "+me.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
org.apache.nifi.processors.mqtt.GetMQTT
|
||||||
|
org.apache.nifi.processors.mqtt.PutMQTT
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.mqtt;
|
||||||
|
|
||||||
|
import org.apache.nifi.processors.mqtt.GetMQTT;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestGetMQTT {
|
||||||
|
|
||||||
|
private TestRunner testRunner;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() {
|
||||||
|
testRunner = TestRunners.newTestRunner(GetMQTT.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProcessor() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.mqtt;
|
||||||
|
|
||||||
|
import org.apache.nifi.processors.mqtt.PutMQTT;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestPutMQTT {
|
||||||
|
|
||||||
|
private TestRunner testRunner;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() {
|
||||||
|
testRunner = TestRunners.newTestRunner(PutMQTT.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProcessor() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-nar-bundles</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<artifactId>nifi-mqtt-bundle</artifactId>
|
||||||
|
<packaging>pom</packaging>
|
||||||
|
<modules>
|
||||||
|
<module>nifi-mqtt-processors</module>
|
||||||
|
<module>nifi-mqtt-nar</module>
|
||||||
|
</modules>
|
||||||
|
<dependencyManagement>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mqtt-processors</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</dependencyManagement>
|
||||||
|
</project>
|
|
@ -60,8 +60,10 @@
|
||||||
<module>nifi-cassandra-bundle</module>
|
<module>nifi-cassandra-bundle</module>
|
||||||
<module>nifi-spring-bundle</module>
|
<module>nifi-spring-bundle</module>
|
||||||
<module>nifi-hive-bundle</module>
|
<module>nifi-hive-bundle</module>
|
||||||
<module>nifi-site-to-site-reporting-bundle</module>
|
<module>nifi-site-to-site-reporting-bundle</module>
|
||||||
</modules>
|
<module>nifi-mqtt-bundle</module>
|
||||||
|
</modules>
|
||||||
|
|
||||||
<dependencyManagement>
|
<dependencyManagement>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
6
pom.xml
6
pom.xml
|
@ -1072,6 +1072,12 @@ language governing permissions and limitations under the License. -->
|
||||||
<version>1.0.0-SNAPSHOT</version>
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
<type>nar</type>
|
<type>nar</type>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mqtt-nar</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
<type>nar</type>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
|
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
|
||||||
|
|
Loading…
Reference in New Issue