mirror of https://github.com/apache/nifi.git
NIFI-11187 Removed ActiveMQ from Standard Processors
Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #6961
This commit is contained in:
parent
0fa1060297
commit
57a1144f34
|
@ -269,10 +269,6 @@
|
|||
<groupId>javax.jms</groupId>
|
||||
<artifactId>javax.jms-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.jayway.jsonpath</groupId>
|
||||
<artifactId>json-path</artifactId>
|
||||
|
@ -345,11 +341,6 @@
|
|||
<version>2.0.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-broker</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service</artifactId>
|
||||
|
|
|
@ -1,240 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processors.standard.util.JmsFactory;
|
||||
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
|
||||
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.ACKNOWLEDGEMENT_MODE;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CLIENT;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
|
||||
|
||||
public abstract class JmsConsumer extends AbstractProcessor {
|
||||
|
||||
public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("All FlowFiles are routed to success")
|
||||
.build();
|
||||
|
||||
private final Set<Relationship> relationships;
|
||||
private final List<PropertyDescriptor> propertyDescriptors;
|
||||
|
||||
public JmsConsumer() {
|
||||
final Set<Relationship> rels = new HashSet<>();
|
||||
rels.add(REL_SUCCESS);
|
||||
this.relationships = Collections.unmodifiableSet(rels);
|
||||
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(JMS_PROVIDER);
|
||||
descriptors.add(URL);
|
||||
descriptors.add(DESTINATION_NAME);
|
||||
descriptors.add(TIMEOUT);
|
||||
descriptors.add(BATCH_SIZE);
|
||||
descriptors.add(USERNAME);
|
||||
descriptors.add(PASSWORD);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(ACKNOWLEDGEMENT_MODE);
|
||||
descriptors.add(MESSAGE_SELECTOR);
|
||||
descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
|
||||
descriptors.add(CLIENT_ID_PREFIX);
|
||||
this.propertyDescriptors = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return propertyDescriptors;
|
||||
}
|
||||
|
||||
public void consume(final ProcessContext context, final ProcessSession session, final WrappedMessageConsumer wrappedConsumer) throws ProcessException {
|
||||
final ComponentLog logger = getLogger();
|
||||
|
||||
final MessageConsumer consumer = wrappedConsumer.getConsumer();
|
||||
final boolean clientAcknowledge = context.getProperty(ACKNOWLEDGEMENT_MODE).getValue().equalsIgnoreCase(ACK_MODE_CLIENT);
|
||||
final long timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||
final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean();
|
||||
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
|
||||
|
||||
final JmsProcessingSummary processingSummary = new JmsProcessingSummary();
|
||||
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
for (int i = 0; i < batchSize; i++) {
|
||||
|
||||
final Message message;
|
||||
try {
|
||||
// If we haven't received a message, wait until one is available. If we have already received at least one
|
||||
// message, then we are not willing to wait for more to become available, but we are willing to keep receiving
|
||||
// all messages that are immediately available.
|
||||
if (processingSummary.getMessagesReceived() == 0) {
|
||||
message = consumer.receive(timeout);
|
||||
} else {
|
||||
message = consumer.receiveNoWait();
|
||||
}
|
||||
} catch (final JMSException e) {
|
||||
logger.error("Failed to receive JMS Message due to {}", e);
|
||||
wrappedConsumer.close(logger);
|
||||
break;
|
||||
}
|
||||
|
||||
if (message == null) { // if no messages, we're done
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
processingSummary.add(map2FlowFile(context, session, message, addAttributes, logger));
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to receive JMS Message due to {}", e);
|
||||
wrappedConsumer.close(logger);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (processingSummary.getFlowFilesCreated() == 0) {
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
session.commitAsync(() -> {
|
||||
// if we need to acknowledge the messages, do so now.
|
||||
final Message lastMessage = processingSummary.getLastMessageReceived();
|
||||
if (clientAcknowledge && lastMessage != null) {
|
||||
try {
|
||||
lastMessage.acknowledge(); // acknowledge all received messages by acknowledging only the last.
|
||||
} catch (final JMSException e) {
|
||||
logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}",
|
||||
new Object[]{processingSummary.getMessagesReceived(), e});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
stopWatch.stop();
|
||||
if (processingSummary.getFlowFilesCreated() > 0) {
|
||||
final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
|
||||
float messagesPerSec = (processingSummary.getMessagesReceived()) / secs;
|
||||
final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
|
||||
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}",
|
||||
new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
|
||||
}
|
||||
}
|
||||
|
||||
public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ComponentLog logger)
|
||||
throws Exception {
|
||||
|
||||
// Currently not very useful, because always one Message == one FlowFile
|
||||
final AtomicInteger msgsThisFlowFile = new AtomicInteger(1);
|
||||
|
||||
FlowFile flowFile = session.create();
|
||||
try {
|
||||
// MapMessage is exception, add only name-value pairs to FlowFile attributes
|
||||
if (message instanceof MapMessage) {
|
||||
MapMessage mapMessage = (MapMessage) message;
|
||||
flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));
|
||||
} else { // all other message types, write Message body to FlowFile content
|
||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(final OutputStream rawOut) throws IOException {
|
||||
try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
|
||||
final byte[] messageBody = JmsFactory.createByteArray(message);
|
||||
out.write(messageBody);
|
||||
} catch (final JMSException e) {
|
||||
throw new ProcessException("Failed to receive JMS Message due to " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (addAttributes) {
|
||||
flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
|
||||
}
|
||||
|
||||
session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'",
|
||||
new Object[]{flowFile, msgsThisFlowFile.get()});
|
||||
|
||||
return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
|
||||
|
||||
} catch (Exception e) {
|
||||
session.remove(flowFile);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
|
||||
final Map<String, String> valueMap = new HashMap<>();
|
||||
|
||||
final Enumeration<?> enumeration = mapMessage.getMapNames();
|
||||
while (enumeration.hasMoreElements()) {
|
||||
final String name = (String) enumeration.nextElement();
|
||||
|
||||
final Object value = mapMessage.getObject(name);
|
||||
if (value == null) {
|
||||
valueMap.put(MAP_MESSAGE_PREFIX + name, "");
|
||||
} else {
|
||||
valueMap.put(MAP_MESSAGE_PREFIX + name, value.toString());
|
||||
}
|
||||
}
|
||||
|
||||
return valueMap;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,57 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.xml.processing.ProcessingException;
|
||||
import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
|
||||
import org.w3c.dom.Document;
|
||||
|
||||
public class DocumentReaderCallback implements InputStreamCallback {
|
||||
|
||||
private final boolean isNamespaceAware;
|
||||
private Document document;
|
||||
|
||||
/**
|
||||
* Creates a new DocumentReaderCallback.
|
||||
*
|
||||
* @param isNamespaceAware Whether or not the parse should consider namespaces
|
||||
*/
|
||||
public DocumentReaderCallback(boolean isNamespaceAware) {
|
||||
this.isNamespaceAware = isNamespaceAware;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final InputStream stream) throws IOException {
|
||||
try {
|
||||
final StandardDocumentProvider documentProvider = new StandardDocumentProvider();
|
||||
documentProvider.setNamespaceAware(isNamespaceAware);
|
||||
document = documentProvider.parse(stream);
|
||||
} catch (final ProcessingException e) {
|
||||
throw new IOException(e.getLocalizedMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the document
|
||||
*/
|
||||
public Document getDocument() {
|
||||
return document;
|
||||
}
|
||||
}
|
|
@ -1,514 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.ACKNOWLEDGEMENT_MODE;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_AUTO;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.ACTIVEMQ_PROVIDER;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE_QUEUE;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE_TOPIC;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUBSCRIPTION;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQSslConnectionFactory;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.util.URISupport;
|
||||
import org.apache.activemq.util.URISupport.CompositeData;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
|
||||
public class JmsFactory {
|
||||
|
||||
public static final boolean DEFAULT_IS_TRANSACTED = false;
|
||||
public static final String ATTRIBUTE_PREFIX = "jms.";
|
||||
public static final String ATTRIBUTE_TYPE_SUFFIX = ".type";
|
||||
public static final String CLIENT_ID_FIXED_PREFIX = "NiFi-";
|
||||
|
||||
// JMS Metadata Fields
|
||||
public static final String JMS_MESSAGE_ID = "JMSMessageID";
|
||||
public static final String JMS_DESTINATION = "JMSDestination";
|
||||
public static final String JMS_REPLY_TO = "JMSReplyTo";
|
||||
public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
|
||||
public static final String JMS_REDELIVERED = "JMSRedelivered";
|
||||
public static final String JMS_CORRELATION_ID = "JMSCorrelationID";
|
||||
public static final String JMS_TYPE = "JMSType";
|
||||
public static final String JMS_TIMESTAMP = "JMSTimestamp";
|
||||
public static final String JMS_EXPIRATION = "JMSExpiration";
|
||||
public static final String JMS_PRIORITY = "JMSPriority";
|
||||
|
||||
// JMS Property Types.
|
||||
public static final String PROP_TYPE_STRING = "string";
|
||||
public static final String PROP_TYPE_INTEGER = "integer";
|
||||
public static final String PROP_TYPE_OBJECT = "object";
|
||||
public static final String PROP_TYPE_BYTE = "byte";
|
||||
public static final String PROP_TYPE_DOUBLE = "double";
|
||||
public static final String PROP_TYPE_FLOAT = "float";
|
||||
public static final String PROP_TYPE_LONG = "long";
|
||||
public static final String PROP_TYPE_SHORT = "short";
|
||||
public static final String PROP_TYPE_BOOLEAN = "boolean";
|
||||
|
||||
public static Connection createConnection(final ProcessContext context) throws JMSException {
|
||||
return createConnection(context, createClientId(context));
|
||||
}
|
||||
|
||||
public static Connection createConnection(final ProcessContext context, final String clientId) throws JMSException {
|
||||
Objects.requireNonNull(context);
|
||||
Objects.requireNonNull(clientId);
|
||||
|
||||
final ConnectionFactory connectionFactory = createConnectionFactory(context);
|
||||
|
||||
final String username = context.getProperty(USERNAME).getValue();
|
||||
final String password = context.getProperty(PASSWORD).getValue();
|
||||
final Connection connection = (username == null && password == null) ? connectionFactory.createConnection() : connectionFactory.createConnection(username, password);
|
||||
|
||||
connection.setClientID(clientId);
|
||||
connection.start();
|
||||
return connection;
|
||||
}
|
||||
|
||||
public static Connection createConnection(final String url, final String jmsProvider, final String username, final String password, final int timeoutMillis) throws JMSException {
|
||||
final ConnectionFactory connectionFactory = createConnectionFactory(url, timeoutMillis, jmsProvider);
|
||||
return (username == null && password == null) ? connectionFactory.createConnection() : connectionFactory.createConnection(username, password);
|
||||
}
|
||||
|
||||
public static String createClientId(final ProcessContext context) {
|
||||
final String clientIdPrefix = context.getProperty(CLIENT_ID_PREFIX).getValue();
|
||||
return CLIENT_ID_FIXED_PREFIX + (clientIdPrefix == null ? "" : clientIdPrefix) + "-" + UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
public static boolean clientIdPrefixEquals(final String one, final String two) {
|
||||
if (one == null) {
|
||||
return two == null;
|
||||
} else if (two == null) {
|
||||
return false;
|
||||
}
|
||||
int uuidLen = UUID.randomUUID().toString().length();
|
||||
if (one.length() <= uuidLen || two.length() <= uuidLen) {
|
||||
return false;
|
||||
}
|
||||
return one.substring(0, one.length() - uuidLen).equals(two.substring(0, two.length() - uuidLen));
|
||||
}
|
||||
|
||||
public static byte[] createByteArray(final Message message) throws JMSException {
|
||||
if (message instanceof TextMessage) {
|
||||
return getMessageBytes((TextMessage) message);
|
||||
} else if (message instanceof BytesMessage) {
|
||||
return getMessageBytes((BytesMessage) message);
|
||||
} else if (message instanceof StreamMessage) {
|
||||
return getMessageBytes((StreamMessage) message);
|
||||
} else if (message instanceof MapMessage) {
|
||||
return getMessageBytes((MapMessage) message);
|
||||
} else if (message instanceof ObjectMessage) {
|
||||
return getMessageBytes((ObjectMessage) message);
|
||||
}
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
private static byte[] getMessageBytes(TextMessage message) throws JMSException {
|
||||
return (message.getText() == null) ? new byte[0] : message.getText().getBytes();
|
||||
}
|
||||
|
||||
private static byte[] getMessageBytes(BytesMessage message) throws JMSException {
|
||||
final long byteCount = message.getBodyLength();
|
||||
if (byteCount > Integer.MAX_VALUE) {
|
||||
throw new JMSException("Incoming message cannot be written to a FlowFile because its size is "
|
||||
+ byteCount
|
||||
+ " bytes, and the maximum size that this processor can handle is "
|
||||
+ Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
byte[] bytes = new byte[(int) byteCount];
|
||||
message.readBytes(bytes);
|
||||
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static byte[] getMessageBytes(StreamMessage message) throws JMSException {
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
|
||||
byte[] byteBuffer = new byte[4096];
|
||||
int byteCount;
|
||||
while ((byteCount = message.readBytes(byteBuffer)) != -1) {
|
||||
baos.write(byteBuffer, 0, byteCount);
|
||||
}
|
||||
|
||||
try {
|
||||
baos.close();
|
||||
} catch (final IOException ioe) {
|
||||
}
|
||||
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private static byte[] getMessageBytes(MapMessage message) throws JMSException {
|
||||
Map<String, String> map = new HashMap<>();
|
||||
Enumeration elements = message.getMapNames();
|
||||
while (elements.hasMoreElements()) {
|
||||
String key = (String) elements.nextElement();
|
||||
map.put(key, message.getString(key));
|
||||
}
|
||||
return map.toString().getBytes();
|
||||
}
|
||||
|
||||
private static byte[] getMessageBytes(ObjectMessage message) throws JMSException {
|
||||
try {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
// will fail if Object is not Serializable
|
||||
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
|
||||
// will fail if Object is not Serializable
|
||||
oos.writeObject(message.getObject());
|
||||
oos.flush();
|
||||
}
|
||||
return baos.toByteArray();
|
||||
} catch (IOException e) {
|
||||
return new byte[0];
|
||||
}
|
||||
}
|
||||
|
||||
public static Session createSession(final ProcessContext context, final Connection connection, final boolean transacted) throws JMSException {
|
||||
final String configuredAckMode = context.getProperty(ACKNOWLEDGEMENT_MODE).getValue();
|
||||
return createSession(connection, configuredAckMode, transacted);
|
||||
}
|
||||
|
||||
public static Session createSession(final Connection connection, final String configuredAckMode, final boolean transacted) throws JMSException {
|
||||
final int ackMode;
|
||||
if (configuredAckMode == null) {
|
||||
ackMode = Session.AUTO_ACKNOWLEDGE;
|
||||
} else {
|
||||
ackMode = configuredAckMode.equalsIgnoreCase(ACK_MODE_AUTO) ? Session.AUTO_ACKNOWLEDGE : Session.CLIENT_ACKNOWLEDGE;
|
||||
}
|
||||
|
||||
final Session session = connection.createSession(transacted, ackMode);
|
||||
return session;
|
||||
}
|
||||
|
||||
public static WrappedMessageConsumer createQueueMessageConsumer(final ProcessContext context) throws JMSException {
|
||||
Connection connection = null;
|
||||
Session jmsSession = null;
|
||||
try {
|
||||
connection = JmsFactory.createConnection(context);
|
||||
jmsSession = JmsFactory.createSession(context, connection, DEFAULT_IS_TRANSACTED);
|
||||
|
||||
final String messageSelector = context.getProperty(MESSAGE_SELECTOR).getValue();
|
||||
final Destination destination = createQueue(context);
|
||||
final MessageConsumer messageConsumer = jmsSession.createConsumer(destination, messageSelector, false);
|
||||
|
||||
return new WrappedMessageConsumer(connection, jmsSession, messageConsumer);
|
||||
} catch (JMSException e) {
|
||||
if (jmsSession != null) {
|
||||
jmsSession.close();
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public static WrappedMessageConsumer createTopicMessageConsumer(final ProcessContext context) throws JMSException {
|
||||
return createTopicMessageConsumer(context, createClientId(context));
|
||||
}
|
||||
|
||||
public static WrappedMessageConsumer createTopicMessageConsumer(final ProcessContext context, final String clientId) throws JMSException {
|
||||
Objects.requireNonNull(context);
|
||||
Objects.requireNonNull(clientId);
|
||||
|
||||
Connection connection = null;
|
||||
Session jmsSession = null;
|
||||
try {
|
||||
connection = JmsFactory.createConnection(context, clientId);
|
||||
jmsSession = JmsFactory.createSession(context, connection, DEFAULT_IS_TRANSACTED);
|
||||
|
||||
final String messageSelector = context.getProperty(MESSAGE_SELECTOR).getValue();
|
||||
final Topic topic = createTopic(context);
|
||||
final MessageConsumer messageConsumer;
|
||||
if (context.getProperty(DURABLE_SUBSCRIPTION).asBoolean()) {
|
||||
messageConsumer = jmsSession.createDurableSubscriber(topic, clientId, messageSelector, false);
|
||||
} else {
|
||||
messageConsumer = jmsSession.createConsumer(topic, messageSelector, false);
|
||||
}
|
||||
|
||||
return new WrappedMessageConsumer(connection, jmsSession, messageConsumer);
|
||||
} catch (JMSException e) {
|
||||
if (jmsSession != null) {
|
||||
jmsSession.close();
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private static Destination getDestination(final ProcessContext context) throws JMSException {
|
||||
final String destinationType = context.getProperty(DESTINATION_TYPE).getValue();
|
||||
switch (destinationType) {
|
||||
case DESTINATION_TYPE_TOPIC:
|
||||
return createTopic(context);
|
||||
case DESTINATION_TYPE_QUEUE:
|
||||
default:
|
||||
return createQueue(context);
|
||||
}
|
||||
}
|
||||
|
||||
public static WrappedMessageProducer createMessageProducer(final ProcessContext context) throws JMSException {
|
||||
return createMessageProducer(context, false);
|
||||
}
|
||||
|
||||
public static WrappedMessageProducer createMessageProducer(final ProcessContext context, final boolean transacted) throws JMSException {
|
||||
Connection connection = null;
|
||||
Session jmsSession = null;
|
||||
|
||||
try {
|
||||
connection = JmsFactory.createConnection(context);
|
||||
jmsSession = JmsFactory.createSession(context, connection, transacted);
|
||||
|
||||
final Destination destination = getDestination(context);
|
||||
final MessageProducer messageProducer = jmsSession.createProducer(destination);
|
||||
|
||||
return new WrappedMessageProducer(connection, jmsSession, messageProducer);
|
||||
} catch (JMSException e) {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
if (jmsSession != null) {
|
||||
jmsSession.close();
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public static Destination createQueue(final ProcessContext context) {
|
||||
return createQueue(context, context.getProperty(DESTINATION_NAME).getValue());
|
||||
}
|
||||
|
||||
public static Queue createQueue(final ProcessContext context, final String queueName) {
|
||||
return createQueue(context.getProperty(JMS_PROVIDER).getValue(), queueName);
|
||||
}
|
||||
|
||||
public static Queue createQueue(final String jmsProvider, final String queueName) {
|
||||
switch (jmsProvider) {
|
||||
case ACTIVEMQ_PROVIDER:
|
||||
default:
|
||||
return new ActiveMQQueue(queueName);
|
||||
}
|
||||
}
|
||||
|
||||
private static Topic createTopic(final ProcessContext context) {
|
||||
final String topicName = context.getProperty(DESTINATION_NAME).getValue();
|
||||
switch (context.getProperty(JMS_PROVIDER).getValue()) {
|
||||
case ACTIVEMQ_PROVIDER:
|
||||
default:
|
||||
return new ActiveMQTopic(topicName);
|
||||
}
|
||||
}
|
||||
|
||||
private static ConnectionFactory createConnectionFactory(final ProcessContext context) throws JMSException {
|
||||
final URI uri;
|
||||
try {
|
||||
uri = new URI(context.getProperty(URL).getValue());
|
||||
} catch (URISyntaxException e) {
|
||||
// Should not happen - URI was validated
|
||||
throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e);
|
||||
}
|
||||
final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
final String provider = context.getProperty(JMS_PROVIDER).getValue();
|
||||
if (isSSL(uri)) {
|
||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
if (sslContextService == null) {
|
||||
throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set.");
|
||||
}
|
||||
return createSslConnectionFactory(uri, timeoutMillis, provider, sslContextService.getKeyStoreFile(),
|
||||
sslContextService.getKeyStorePassword(), sslContextService.getTrustStoreFile(), sslContextService.getTrustStorePassword());
|
||||
} else {
|
||||
return createConnectionFactory(uri, timeoutMillis, provider);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isSSL(URI uri) {
|
||||
try {
|
||||
CompositeData compositeData = URISupport.parseComposite(uri);
|
||||
if ("ssl".equals(compositeData.getScheme())) {
|
||||
return true;
|
||||
}
|
||||
for(URI component : compositeData.getComponents()){
|
||||
if ("ssl".equals(component.getScheme())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException("Attempting to initiate JMS with invalid composite URI [" + uri + "]", e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static ConnectionFactory createConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider) throws JMSException {
|
||||
return createConnectionFactory(uri.toString(), timeoutMillis, jmsProvider);
|
||||
}
|
||||
|
||||
public static ConnectionFactory createConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider) throws JMSException {
|
||||
switch (jmsProvider) {
|
||||
case ACTIVEMQ_PROVIDER: {
|
||||
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
|
||||
factory.setSendTimeout(timeoutMillis);
|
||||
return factory;
|
||||
}
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider);
|
||||
}
|
||||
}
|
||||
|
||||
public static ConnectionFactory createSslConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider,
|
||||
final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
|
||||
return createSslConnectionFactory(uri.toString(), timeoutMillis, jmsProvider, keystore, keystorePassword, truststore, truststorePassword);
|
||||
}
|
||||
|
||||
public static ConnectionFactory createSslConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider,
|
||||
final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
|
||||
switch (jmsProvider) {
|
||||
case ACTIVEMQ_PROVIDER: {
|
||||
final ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory(url);
|
||||
try {
|
||||
factory.setKeyStore(keystore);
|
||||
} catch (Exception e) {
|
||||
throw new JMSException("Problem Setting the KeyStore: " + e.getMessage());
|
||||
}
|
||||
factory.setKeyStorePassword(keystorePassword);
|
||||
try {
|
||||
factory.setTrustStore(truststore);
|
||||
} catch (Exception e) {
|
||||
throw new JMSException("Problem Setting the TrustStore: " + e.getMessage());
|
||||
}
|
||||
factory.setTrustStorePassword(truststorePassword);
|
||||
factory.setSendTimeout(timeoutMillis);
|
||||
return factory;
|
||||
}
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider);
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
|
||||
final Enumeration<?> enumeration = message.getPropertyNames();
|
||||
while (enumeration.hasMoreElements()) {
|
||||
final String propName = (String) enumeration.nextElement();
|
||||
|
||||
final Object value = message.getObjectProperty(propName);
|
||||
|
||||
if (value == null) {
|
||||
attributes.put(ATTRIBUTE_PREFIX + propName, "");
|
||||
attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, "Unknown");
|
||||
continue;
|
||||
}
|
||||
|
||||
final String valueString = value.toString();
|
||||
attributes.put(ATTRIBUTE_PREFIX + propName, valueString);
|
||||
|
||||
final String propType;
|
||||
if (value instanceof String) {
|
||||
propType = PROP_TYPE_STRING;
|
||||
} else if (value instanceof Double) {
|
||||
propType = PROP_TYPE_DOUBLE;
|
||||
} else if (value instanceof Float) {
|
||||
propType = PROP_TYPE_FLOAT;
|
||||
} else if (value instanceof Long) {
|
||||
propType = PROP_TYPE_LONG;
|
||||
} else if (value instanceof Integer) {
|
||||
propType = PROP_TYPE_INTEGER;
|
||||
} else if (value instanceof Short) {
|
||||
propType = PROP_TYPE_SHORT;
|
||||
} else if (value instanceof Byte) {
|
||||
propType = PROP_TYPE_BYTE;
|
||||
} else if (value instanceof Boolean) {
|
||||
propType = PROP_TYPE_BOOLEAN;
|
||||
} else {
|
||||
propType = PROP_TYPE_OBJECT;
|
||||
}
|
||||
|
||||
attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, propType);
|
||||
}
|
||||
|
||||
if (message.getJMSCorrelationID() != null) {
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
|
||||
}
|
||||
if (message.getJMSDestination() != null) {
|
||||
String destinationName;
|
||||
if (message.getJMSDestination() instanceof Queue) {
|
||||
destinationName = ((Queue) message.getJMSDestination()).getQueueName();
|
||||
} else {
|
||||
destinationName = ((Topic) message.getJMSDestination()).getTopicName();
|
||||
}
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
|
||||
}
|
||||
if (message.getJMSMessageID() != null) {
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
|
||||
}
|
||||
if (message.getJMSReplyTo() != null) {
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_REPLY_TO, message.getJMSReplyTo().toString());
|
||||
}
|
||||
if (message.getJMSType() != null) {
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_TYPE, message.getJMSType());
|
||||
}
|
||||
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_EXPIRATION, String.valueOf(message.getJMSExpiration()));
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_PRIORITY, String.valueOf(message.getJMSPriority()));
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_REDELIVERED, String.valueOf(message.getJMSRedelivered()));
|
||||
attributes.put(ATTRIBUTE_PREFIX + JMS_TIMESTAMP, String.valueOf(message.getJMSTimestamp()));
|
||||
return attributes;
|
||||
}
|
||||
}
|
|
@ -1,81 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import javax.jms.Message;
|
||||
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
|
||||
/**
|
||||
* Data structure which allows to collect processing summary data.
|
||||
*
|
||||
*/
|
||||
public class JmsProcessingSummary {
|
||||
|
||||
private int messagesReceived;
|
||||
private long bytesReceived;
|
||||
private Message lastMessageReceived;
|
||||
private int flowFilesCreated;
|
||||
private FlowFile lastFlowFile; // helps testing
|
||||
|
||||
public JmsProcessingSummary() {
|
||||
super();
|
||||
this.messagesReceived = 0;
|
||||
this.bytesReceived = 0;
|
||||
this.lastMessageReceived = null;
|
||||
this.flowFilesCreated = 0;
|
||||
this.lastFlowFile = null;
|
||||
}
|
||||
|
||||
public JmsProcessingSummary(long bytesReceived, Message lastMessageReceived, FlowFile lastFlowFile) {
|
||||
super();
|
||||
this.messagesReceived = 1;
|
||||
this.bytesReceived = bytesReceived;
|
||||
this.lastMessageReceived = lastMessageReceived;
|
||||
this.flowFilesCreated = 1;
|
||||
this.lastFlowFile = lastFlowFile;
|
||||
}
|
||||
|
||||
public void add(JmsProcessingSummary jmsProcessingSummary) {
|
||||
this.messagesReceived += jmsProcessingSummary.messagesReceived;
|
||||
this.bytesReceived += jmsProcessingSummary.bytesReceived;
|
||||
this.lastMessageReceived = jmsProcessingSummary.lastMessageReceived;
|
||||
this.flowFilesCreated += jmsProcessingSummary.flowFilesCreated;
|
||||
this.lastFlowFile = jmsProcessingSummary.lastFlowFile;
|
||||
}
|
||||
|
||||
public int getMessagesReceived() {
|
||||
return messagesReceived;
|
||||
}
|
||||
|
||||
public long getBytesReceived() {
|
||||
return bytesReceived;
|
||||
}
|
||||
|
||||
public Message getLastMessageReceived() {
|
||||
return lastMessageReceived;
|
||||
}
|
||||
|
||||
public int getFlowFilesCreated() {
|
||||
return flowFilesCreated;
|
||||
}
|
||||
|
||||
public FlowFile getLastFlowFile() {
|
||||
return lastFlowFile;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,189 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
|
||||
public class JmsProperties {
|
||||
|
||||
public static final String ACTIVEMQ_PROVIDER = "ActiveMQ";
|
||||
|
||||
public static final String ACK_MODE_CLIENT = "Client Acknowledge";
|
||||
public static final String ACK_MODE_AUTO = "Auto Acknowledge";
|
||||
|
||||
public static final String DESTINATION_TYPE_QUEUE = "Queue";
|
||||
public static final String DESTINATION_TYPE_TOPIC = "Topic";
|
||||
|
||||
public static final String MSG_TYPE_BYTE = "byte";
|
||||
public static final String MSG_TYPE_TEXT = "text";
|
||||
public static final String MSG_TYPE_STREAM = "stream";
|
||||
public static final String MSG_TYPE_MAP = "map";
|
||||
public static final String MSG_TYPE_EMPTY = "empty";
|
||||
|
||||
// Standard JMS Properties
|
||||
public static final PropertyDescriptor JMS_PROVIDER = new PropertyDescriptor.Builder()
|
||||
.name("JMS Provider")
|
||||
.description("The Provider used for the JMS Server")
|
||||
.required(true)
|
||||
.allowableValues(ACTIVEMQ_PROVIDER)
|
||||
.defaultValue(ACTIVEMQ_PROVIDER)
|
||||
.build();
|
||||
public static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
|
||||
.name("URL")
|
||||
.description("The URL of the JMS Server")
|
||||
.addValidator(StandardValidators.URI_VALIDATOR)
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Communications Timeout")
|
||||
.description("The amount of time to wait when attempting to receive a message before giving up and assuming failure")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.defaultValue("30 sec")
|
||||
.build();
|
||||
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
|
||||
.name("Username")
|
||||
.description("Username used for authentication and authorization")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("Password")
|
||||
.description("Password used for authentication and authorization")
|
||||
.required(false)
|
||||
.addValidator(Validator.VALID)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor CLIENT_ID_PREFIX = new PropertyDescriptor.Builder()
|
||||
.name("Client ID Prefix")
|
||||
.description("A human-readable ID that can be used to associate connections with yourself so that the maintainers of the JMS Server know who to contact if problems arise")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
// Topic/Queue determination Properties
|
||||
public static final PropertyDescriptor DESTINATION_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Destination Name")
|
||||
.description("The name of the JMS Topic or queue to use")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("Destination Type")
|
||||
.description("The type of the JMS Destination to use")
|
||||
.required(true)
|
||||
.allowableValues(DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC)
|
||||
.defaultValue(DESTINATION_TYPE_QUEUE)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DURABLE_SUBSCRIPTION = new PropertyDescriptor.Builder()
|
||||
.name("Use Durable Subscription")
|
||||
.description("If true, connections to the specified topic will use Durable Subscription so that messages are queued when we are not pulling them")
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.build();
|
||||
|
||||
// JMS Publisher Properties
|
||||
public static final PropertyDescriptor ATTRIBUTES_TO_JMS_PROPS = new PropertyDescriptor.Builder()
|
||||
.name("Copy Attributes to JMS Properties")
|
||||
.description("Whether or not FlowFile Attributes should be translated into JMS Message Properties. If true, all "
|
||||
+ "attributes starting with 'jms.' will be set as Properties on the JMS Message (without the 'jms.' prefix). "
|
||||
+ "If an attribute exists that starts with the same value but ends in '.type', that attribute will be used "
|
||||
+ "to determine the JMS Message Property type.")
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.build();
|
||||
|
||||
// JMS Listener Properties
|
||||
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Message Batch Size")
|
||||
.description("The number of messages to pull/push in a single iteration of the processor")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.defaultValue("10")
|
||||
.build();
|
||||
public static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder()
|
||||
.name("Acknowledgement Mode")
|
||||
.description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide better performance than Client Acknowledge.")
|
||||
.required(true)
|
||||
.allowableValues(ACK_MODE_CLIENT, ACK_MODE_AUTO)
|
||||
.defaultValue(ACK_MODE_CLIENT)
|
||||
.build();
|
||||
public static final PropertyDescriptor JMS_PROPS_TO_ATTRIBUTES = new PropertyDescriptor.Builder()
|
||||
.name("Copy JMS Properties to Attributes")
|
||||
.description("Whether or not the JMS Message Properties should be copied to the FlowFile Attributes; if so, the attribute name will be jms.XXX, where XXX is the JMS Property name")
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.build();
|
||||
public static final PropertyDescriptor MESSAGE_SELECTOR = new PropertyDescriptor.Builder()
|
||||
.name("Message Selector")
|
||||
.description("The JMS Message Selector to use in order to narrow the messages that are pulled")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
// JMS Producer Properties
|
||||
public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("Message Type")
|
||||
.description("The Type of JMS Message to Construct")
|
||||
.required(true)
|
||||
.allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY)
|
||||
.defaultValue(MSG_TYPE_BYTE)
|
||||
.build();
|
||||
public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder()
|
||||
.name("Message Priority")
|
||||
.description("The Priority of the Message")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
public static final PropertyDescriptor REPLY_TO_QUEUE = new PropertyDescriptor.Builder()
|
||||
.name("Reply-To Queue")
|
||||
.description("The name of the queue to which a reply to should be added")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
public static final PropertyDescriptor MESSAGE_TTL = new PropertyDescriptor.Builder()
|
||||
.name("Message Time to Live")
|
||||
.description("The amount of time that the message should live on the destination before being removed; if not specified, the message will never expire.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Max Buffer Size")
|
||||
.description("The maximum amount of data that can be buffered for a JMS Message. If a FlowFile's size exceeds this value, the FlowFile will be routed to failure.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.defaultValue("1 MB")
|
||||
.build();
|
||||
|
||||
// JMS SSL Properties
|
||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.description("The Controller Service to use in order to obtain an SSL Context.")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
}
|
|
@ -1,214 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.io.nio.BufferPool;
|
||||
import org.apache.nifi.io.nio.consumer.StreamConsumer;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class UDPStreamConsumer implements StreamConsumer {
|
||||
|
||||
private final ComponentLog logger;
|
||||
final List<FlowFile> newFlowFileQueue;
|
||||
private final String uniqueId;
|
||||
private BufferPool bufferPool = null;
|
||||
private final BlockingQueue<ByteBuffer> filledBuffers = new LinkedBlockingQueue<>();
|
||||
private final AtomicBoolean streamEnded = new AtomicBoolean(false);
|
||||
private final AtomicBoolean consumerDone = new AtomicBoolean(false);
|
||||
private ProcessSession session;
|
||||
private final UDPConsumerCallback udpCallback;
|
||||
|
||||
public UDPStreamConsumer(final String streamId, final List<FlowFile> newFlowFiles, final long fileSizeTrigger, final ComponentLog logger,
|
||||
final boolean flowFilePerDatagram) {
|
||||
this.uniqueId = streamId;
|
||||
this.newFlowFileQueue = newFlowFiles;
|
||||
this.logger = logger;
|
||||
this.udpCallback = new UDPConsumerCallback(filledBuffers, fileSizeTrigger, flowFilePerDatagram);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReturnBufferQueue(final BufferPool pool) {
|
||||
this.bufferPool = pool;
|
||||
this.udpCallback.setBufferPool(pool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFilledBuffer(final ByteBuffer buffer) {
|
||||
if (isConsumerFinished()) {
|
||||
bufferPool.returnBuffer(buffer, 0);
|
||||
} else {
|
||||
filledBuffers.add(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
private void close() {
|
||||
if (isConsumerFinished()) {
|
||||
return;
|
||||
}
|
||||
consumerDone.set(true);
|
||||
ByteBuffer buf = null;
|
||||
while ((buf = filledBuffers.poll()) != null) {
|
||||
bufferPool.returnBuffer(buf, 0);
|
||||
}
|
||||
}
|
||||
|
||||
public void setSession(ProcessSession session) {
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process() throws IOException {
|
||||
if (isConsumerFinished()) {
|
||||
return;
|
||||
}
|
||||
|
||||
FlowFile newFlowFile = null;
|
||||
try {
|
||||
if (streamEnded.get() && filledBuffers.isEmpty()) {
|
||||
close();
|
||||
return;
|
||||
}
|
||||
if (filledBuffers.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// time to make a new flow file
|
||||
newFlowFile = session.create();
|
||||
newFlowFile = session.putAttribute(newFlowFile, "source.stream.identifier", uniqueId);
|
||||
newFlowFile = session.write(newFlowFile, udpCallback);
|
||||
if (newFlowFile.getSize() == 0) {
|
||||
session.remove(newFlowFile);
|
||||
return;
|
||||
}
|
||||
newFlowFileQueue.add(newFlowFile);
|
||||
} catch (final Exception ex) {
|
||||
close();
|
||||
if (newFlowFile != null) {
|
||||
try {
|
||||
session.remove(newFlowFile);
|
||||
} catch (final Exception ex2) {
|
||||
logger.warn("Unable to delete partial flow file due to: ", ex2);
|
||||
}
|
||||
}
|
||||
throw new IOException("Problem while processing data stream", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void signalEndOfStream() {
|
||||
streamEnded.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConsumerFinished() {
|
||||
return consumerDone.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return uniqueId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean equals(final Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
UDPStreamConsumer rhs = (UDPStreamConsumer) obj;
|
||||
return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int hashCode() {
|
||||
return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String toString() {
|
||||
return new ToStringBuilder(this).append(uniqueId).toString();
|
||||
}
|
||||
|
||||
public static final class UDPConsumerCallback implements OutputStreamCallback {
|
||||
|
||||
BufferPool bufferPool;
|
||||
final BlockingQueue<ByteBuffer> filledBuffers;
|
||||
final long fileSizeTrigger;
|
||||
final boolean flowFilePerDatagram;
|
||||
|
||||
public UDPConsumerCallback(final BlockingQueue<ByteBuffer> filledBuffers, final long fileSizeTrigger, final boolean flowFilePerDatagram) {
|
||||
this.filledBuffers = filledBuffers;
|
||||
this.fileSizeTrigger = fileSizeTrigger;
|
||||
this.flowFilePerDatagram = flowFilePerDatagram;
|
||||
}
|
||||
|
||||
public void setBufferPool(BufferPool pool) {
|
||||
this.bufferPool = pool;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final OutputStream out) throws IOException {
|
||||
try {
|
||||
long totalBytes = 0L;
|
||||
try (WritableByteChannel wbc = Channels.newChannel(new BufferedOutputStream(out))) {
|
||||
ByteBuffer buffer = null;
|
||||
while ((buffer = filledBuffers.poll(50, TimeUnit.MILLISECONDS)) != null) {
|
||||
int bytesWrittenThisPass = 0;
|
||||
try {
|
||||
while (buffer.hasRemaining()) {
|
||||
bytesWrittenThisPass += wbc.write(buffer);
|
||||
}
|
||||
totalBytes += bytesWrittenThisPass;
|
||||
if (totalBytes > fileSizeTrigger || flowFilePerDatagram) {
|
||||
break;// this is enough data
|
||||
}
|
||||
} finally {
|
||||
bufferPool.returnBuffer(buffer, bytesWrittenThisPass);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (final InterruptedException ie) {
|
||||
// irrelevant
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
|
||||
public class WrappedMessageConsumer {
|
||||
|
||||
private final Connection connection;
|
||||
private final Session session;
|
||||
private final MessageConsumer consumer;
|
||||
|
||||
private boolean closed = false;
|
||||
|
||||
public WrappedMessageConsumer(final Connection connection, final Session jmsSession, final MessageConsumer messageConsumer) {
|
||||
this.connection = connection;
|
||||
this.session = jmsSession;
|
||||
this.consumer = messageConsumer;
|
||||
}
|
||||
|
||||
public Connection getConnection() {
|
||||
return connection;
|
||||
}
|
||||
|
||||
public Session getSession() {
|
||||
return session;
|
||||
}
|
||||
|
||||
public MessageConsumer getConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
public void close(final ComponentLog logger) {
|
||||
closed = true;
|
||||
|
||||
try {
|
||||
connection.close();
|
||||
} catch (final JMSException e) {
|
||||
logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
|
||||
}
|
||||
|
||||
try {
|
||||
session.close();
|
||||
} catch (final JMSException e) {
|
||||
logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
|
||||
}
|
||||
|
||||
try {
|
||||
consumer.close();
|
||||
} catch (final JMSException e) {
|
||||
logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
|
||||
public class WrappedMessageProducer {
|
||||
|
||||
private final Connection connection;
|
||||
private final Session session;
|
||||
private final MessageProducer producer;
|
||||
|
||||
private boolean closed = false;
|
||||
|
||||
public WrappedMessageProducer(final Connection connection, final Session jmsSession, final MessageProducer messageProducer) {
|
||||
this.connection = connection;
|
||||
this.session = jmsSession;
|
||||
this.producer = messageProducer;
|
||||
}
|
||||
|
||||
public Connection getConnection() {
|
||||
return connection;
|
||||
}
|
||||
|
||||
public Session getSession() {
|
||||
return session;
|
||||
}
|
||||
|
||||
public MessageProducer getProducer() {
|
||||
return producer;
|
||||
}
|
||||
|
||||
public void close(final ComponentLog logger) {
|
||||
closed = true;
|
||||
|
||||
try {
|
||||
connection.close();
|
||||
} catch (final JMSException e) {
|
||||
logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
|
||||
}
|
||||
|
||||
try {
|
||||
session.close();
|
||||
} catch (final JMSException e) {
|
||||
logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
|
||||
}
|
||||
|
||||
try {
|
||||
producer.close();
|
||||
} catch (final JMSException e) {
|
||||
logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
}
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
|
||||
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
|
||||
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
||||
|
@ -26,6 +25,7 @@ import org.junit.jupiter.api.BeforeEach;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
|
|
|
@ -176,17 +176,6 @@
|
|||
<artifactId>javax.jms-api</artifactId>
|
||||
<version>2.0.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-client</artifactId>
|
||||
<version>5.15.15</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-broker</artifactId>
|
||||
<version>5.15.14</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
|
|
Loading…
Reference in New Issue