diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 51d63fd09f..1f28987cbe 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -269,10 +269,6 @@ javax.jms javax.jms-api - - org.apache.activemq - activemq-client - com.jayway.jsonpath json-path @@ -345,11 +341,6 @@ 2.0.0-SNAPSHOT test - - org.apache.activemq - activemq-broker - test - org.apache.nifi nifi-ssl-context-service diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java deleted file mode 100644 index 2d8f96f5f4..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processors.standard.util.JmsFactory; -import org.apache.nifi.processors.standard.util.JmsProcessingSummary; -import org.apache.nifi.processors.standard.util.WrappedMessageConsumer; -import org.apache.nifi.util.StopWatch; - -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.nifi.processors.standard.util.JmsProperties.ACKNOWLEDGEMENT_MODE; -import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CLIENT; -import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE; -import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX; -import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME; -import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES; -import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER; -import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR; -import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD; -import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE; -import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT; -import static org.apache.nifi.processors.standard.util.JmsProperties.URL; -import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME; - -public abstract class JmsConsumer extends AbstractProcessor { - - public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage."; - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles are routed to success") - .build(); - - private final Set relationships; - private final List propertyDescriptors; - - public JmsConsumer() { - final Set rels = new HashSet<>(); - rels.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(rels); - - final List descriptors = new ArrayList<>(); - descriptors.add(JMS_PROVIDER); - descriptors.add(URL); - descriptors.add(DESTINATION_NAME); - descriptors.add(TIMEOUT); - descriptors.add(BATCH_SIZE); - descriptors.add(USERNAME); - descriptors.add(PASSWORD); - descriptors.add(SSL_CONTEXT_SERVICE); - descriptors.add(ACKNOWLEDGEMENT_MODE); - descriptors.add(MESSAGE_SELECTOR); - descriptors.add(JMS_PROPS_TO_ATTRIBUTES); - descriptors.add(CLIENT_ID_PREFIX); - this.propertyDescriptors = Collections.unmodifiableList(descriptors); - } - - @Override - public Set getRelationships() { - return relationships; - } - - @Override - protected List getSupportedPropertyDescriptors() { - return propertyDescriptors; - } - - public void consume(final ProcessContext context, final ProcessSession session, final WrappedMessageConsumer wrappedConsumer) throws ProcessException { - final ComponentLog logger = getLogger(); - - final MessageConsumer consumer = wrappedConsumer.getConsumer(); - final boolean clientAcknowledge = context.getProperty(ACKNOWLEDGEMENT_MODE).getValue().equalsIgnoreCase(ACK_MODE_CLIENT); - final long timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean(); - final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); - - final JmsProcessingSummary processingSummary = new JmsProcessingSummary(); - - final StopWatch stopWatch = new StopWatch(true); - for (int i = 0; i < batchSize; i++) { - - final Message message; - try { - // If we haven't received a message, wait until one is available. If we have already received at least one - // message, then we are not willing to wait for more to become available, but we are willing to keep receiving - // all messages that are immediately available. - if (processingSummary.getMessagesReceived() == 0) { - message = consumer.receive(timeout); - } else { - message = consumer.receiveNoWait(); - } - } catch (final JMSException e) { - logger.error("Failed to receive JMS Message due to {}", e); - wrappedConsumer.close(logger); - break; - } - - if (message == null) { // if no messages, we're done - break; - } - - try { - processingSummary.add(map2FlowFile(context, session, message, addAttributes, logger)); - } catch (Exception e) { - logger.error("Failed to receive JMS Message due to {}", e); - wrappedConsumer.close(logger); - break; - } - } - - if (processingSummary.getFlowFilesCreated() == 0) { - context.yield(); - return; - } - - session.commitAsync(() -> { - // if we need to acknowledge the messages, do so now. - final Message lastMessage = processingSummary.getLastMessageReceived(); - if (clientAcknowledge && lastMessage != null) { - try { - lastMessage.acknowledge(); // acknowledge all received messages by acknowledging only the last. - } catch (final JMSException e) { - logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", - new Object[]{processingSummary.getMessagesReceived(), e}); - } - } - }); - - stopWatch.stop(); - if (processingSummary.getFlowFilesCreated() > 0) { - final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F); - float messagesPerSec = (processingSummary.getMessagesReceived()) / secs; - final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived()); - logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", - new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate}); - } - } - - public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ComponentLog logger) - throws Exception { - - // Currently not very useful, because always one Message == one FlowFile - final AtomicInteger msgsThisFlowFile = new AtomicInteger(1); - - FlowFile flowFile = session.create(); - try { - // MapMessage is exception, add only name-value pairs to FlowFile attributes - if (message instanceof MapMessage) { - MapMessage mapMessage = (MapMessage) message; - flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage)); - } else { // all other message types, write Message body to FlowFile content - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) { - final byte[] messageBody = JmsFactory.createByteArray(message); - out.write(messageBody); - } catch (final JMSException e) { - throw new ProcessException("Failed to receive JMS Message due to " + e.getMessage(), e); - } - } - }); - } - - if (addAttributes) { - flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message)); - } - - session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue()); - session.transfer(flowFile, REL_SUCCESS); - logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", - new Object[]{flowFile, msgsThisFlowFile.get()}); - - return new JmsProcessingSummary(flowFile.getSize(), message, flowFile); - - } catch (Exception e) { - session.remove(flowFile); - throw e; - } - } - - public static Map createMapMessageValues(final MapMessage mapMessage) throws JMSException { - final Map valueMap = new HashMap<>(); - - final Enumeration enumeration = mapMessage.getMapNames(); - while (enumeration.hasMoreElements()) { - final String name = (String) enumeration.nextElement(); - - final Object value = mapMessage.getObject(name); - if (value == null) { - valueMap.put(MAP_MESSAGE_PREFIX + name, ""); - } else { - valueMap.put(MAP_MESSAGE_PREFIX + name, value.toString()); - } - } - - return valueMap; - } - -} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java deleted file mode 100644 index f2230e2c3f..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import java.io.IOException; -import java.io.InputStream; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.xml.processing.ProcessingException; -import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider; -import org.w3c.dom.Document; - -public class DocumentReaderCallback implements InputStreamCallback { - - private final boolean isNamespaceAware; - private Document document; - - /** - * Creates a new DocumentReaderCallback. - * - * @param isNamespaceAware Whether or not the parse should consider namespaces - */ - public DocumentReaderCallback(boolean isNamespaceAware) { - this.isNamespaceAware = isNamespaceAware; - } - - @Override - public void process(final InputStream stream) throws IOException { - try { - final StandardDocumentProvider documentProvider = new StandardDocumentProvider(); - documentProvider.setNamespaceAware(isNamespaceAware); - document = documentProvider.parse(stream); - } catch (final ProcessingException e) { - throw new IOException(e.getLocalizedMessage(), e); - } - } - - /** - * @return the document - */ - public Document getDocument() { - return document; - } -} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java deleted file mode 100644 index 26f08af1e6..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java +++ /dev/null @@ -1,514 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import static org.apache.nifi.processors.standard.util.JmsProperties.ACKNOWLEDGEMENT_MODE; -import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_AUTO; -import static org.apache.nifi.processors.standard.util.JmsProperties.ACTIVEMQ_PROVIDER; -import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX; -import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME; -import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE; -import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE_QUEUE; -import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE_TOPIC; -import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUBSCRIPTION; -import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER; -import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR; -import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD; -import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE; -import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT; -import static org.apache.nifi.processors.standard.util.JmsProperties.URL; -import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQSslConnectionFactory; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.URISupport; -import org.apache.activemq.util.URISupport.CompositeData; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.ssl.SSLContextService; - -public class JmsFactory { - - public static final boolean DEFAULT_IS_TRANSACTED = false; - public static final String ATTRIBUTE_PREFIX = "jms."; - public static final String ATTRIBUTE_TYPE_SUFFIX = ".type"; - public static final String CLIENT_ID_FIXED_PREFIX = "NiFi-"; - - // JMS Metadata Fields - public static final String JMS_MESSAGE_ID = "JMSMessageID"; - public static final String JMS_DESTINATION = "JMSDestination"; - public static final String JMS_REPLY_TO = "JMSReplyTo"; - public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode"; - public static final String JMS_REDELIVERED = "JMSRedelivered"; - public static final String JMS_CORRELATION_ID = "JMSCorrelationID"; - public static final String JMS_TYPE = "JMSType"; - public static final String JMS_TIMESTAMP = "JMSTimestamp"; - public static final String JMS_EXPIRATION = "JMSExpiration"; - public static final String JMS_PRIORITY = "JMSPriority"; - - // JMS Property Types. - public static final String PROP_TYPE_STRING = "string"; - public static final String PROP_TYPE_INTEGER = "integer"; - public static final String PROP_TYPE_OBJECT = "object"; - public static final String PROP_TYPE_BYTE = "byte"; - public static final String PROP_TYPE_DOUBLE = "double"; - public static final String PROP_TYPE_FLOAT = "float"; - public static final String PROP_TYPE_LONG = "long"; - public static final String PROP_TYPE_SHORT = "short"; - public static final String PROP_TYPE_BOOLEAN = "boolean"; - - public static Connection createConnection(final ProcessContext context) throws JMSException { - return createConnection(context, createClientId(context)); - } - - public static Connection createConnection(final ProcessContext context, final String clientId) throws JMSException { - Objects.requireNonNull(context); - Objects.requireNonNull(clientId); - - final ConnectionFactory connectionFactory = createConnectionFactory(context); - - final String username = context.getProperty(USERNAME).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); - final Connection connection = (username == null && password == null) ? connectionFactory.createConnection() : connectionFactory.createConnection(username, password); - - connection.setClientID(clientId); - connection.start(); - return connection; - } - - public static Connection createConnection(final String url, final String jmsProvider, final String username, final String password, final int timeoutMillis) throws JMSException { - final ConnectionFactory connectionFactory = createConnectionFactory(url, timeoutMillis, jmsProvider); - return (username == null && password == null) ? connectionFactory.createConnection() : connectionFactory.createConnection(username, password); - } - - public static String createClientId(final ProcessContext context) { - final String clientIdPrefix = context.getProperty(CLIENT_ID_PREFIX).getValue(); - return CLIENT_ID_FIXED_PREFIX + (clientIdPrefix == null ? "" : clientIdPrefix) + "-" + UUID.randomUUID().toString(); - } - - public static boolean clientIdPrefixEquals(final String one, final String two) { - if (one == null) { - return two == null; - } else if (two == null) { - return false; - } - int uuidLen = UUID.randomUUID().toString().length(); - if (one.length() <= uuidLen || two.length() <= uuidLen) { - return false; - } - return one.substring(0, one.length() - uuidLen).equals(two.substring(0, two.length() - uuidLen)); - } - - public static byte[] createByteArray(final Message message) throws JMSException { - if (message instanceof TextMessage) { - return getMessageBytes((TextMessage) message); - } else if (message instanceof BytesMessage) { - return getMessageBytes((BytesMessage) message); - } else if (message instanceof StreamMessage) { - return getMessageBytes((StreamMessage) message); - } else if (message instanceof MapMessage) { - return getMessageBytes((MapMessage) message); - } else if (message instanceof ObjectMessage) { - return getMessageBytes((ObjectMessage) message); - } - return new byte[0]; - } - - private static byte[] getMessageBytes(TextMessage message) throws JMSException { - return (message.getText() == null) ? new byte[0] : message.getText().getBytes(); - } - - private static byte[] getMessageBytes(BytesMessage message) throws JMSException { - final long byteCount = message.getBodyLength(); - if (byteCount > Integer.MAX_VALUE) { - throw new JMSException("Incoming message cannot be written to a FlowFile because its size is " - + byteCount - + " bytes, and the maximum size that this processor can handle is " - + Integer.MAX_VALUE); - } - - byte[] bytes = new byte[(int) byteCount]; - message.readBytes(bytes); - - return bytes; - } - - private static byte[] getMessageBytes(StreamMessage message) throws JMSException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - byte[] byteBuffer = new byte[4096]; - int byteCount; - while ((byteCount = message.readBytes(byteBuffer)) != -1) { - baos.write(byteBuffer, 0, byteCount); - } - - try { - baos.close(); - } catch (final IOException ioe) { - } - - return baos.toByteArray(); - } - - @SuppressWarnings("rawtypes") - private static byte[] getMessageBytes(MapMessage message) throws JMSException { - Map map = new HashMap<>(); - Enumeration elements = message.getMapNames(); - while (elements.hasMoreElements()) { - String key = (String) elements.nextElement(); - map.put(key, message.getString(key)); - } - return map.toString().getBytes(); - } - - private static byte[] getMessageBytes(ObjectMessage message) throws JMSException { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - // will fail if Object is not Serializable - try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { - // will fail if Object is not Serializable - oos.writeObject(message.getObject()); - oos.flush(); - } - return baos.toByteArray(); - } catch (IOException e) { - return new byte[0]; - } - } - - public static Session createSession(final ProcessContext context, final Connection connection, final boolean transacted) throws JMSException { - final String configuredAckMode = context.getProperty(ACKNOWLEDGEMENT_MODE).getValue(); - return createSession(connection, configuredAckMode, transacted); - } - - public static Session createSession(final Connection connection, final String configuredAckMode, final boolean transacted) throws JMSException { - final int ackMode; - if (configuredAckMode == null) { - ackMode = Session.AUTO_ACKNOWLEDGE; - } else { - ackMode = configuredAckMode.equalsIgnoreCase(ACK_MODE_AUTO) ? Session.AUTO_ACKNOWLEDGE : Session.CLIENT_ACKNOWLEDGE; - } - - final Session session = connection.createSession(transacted, ackMode); - return session; - } - - public static WrappedMessageConsumer createQueueMessageConsumer(final ProcessContext context) throws JMSException { - Connection connection = null; - Session jmsSession = null; - try { - connection = JmsFactory.createConnection(context); - jmsSession = JmsFactory.createSession(context, connection, DEFAULT_IS_TRANSACTED); - - final String messageSelector = context.getProperty(MESSAGE_SELECTOR).getValue(); - final Destination destination = createQueue(context); - final MessageConsumer messageConsumer = jmsSession.createConsumer(destination, messageSelector, false); - - return new WrappedMessageConsumer(connection, jmsSession, messageConsumer); - } catch (JMSException e) { - if (jmsSession != null) { - jmsSession.close(); - } - if (connection != null) { - connection.close(); - } - throw e; - } - } - - public static WrappedMessageConsumer createTopicMessageConsumer(final ProcessContext context) throws JMSException { - return createTopicMessageConsumer(context, createClientId(context)); - } - - public static WrappedMessageConsumer createTopicMessageConsumer(final ProcessContext context, final String clientId) throws JMSException { - Objects.requireNonNull(context); - Objects.requireNonNull(clientId); - - Connection connection = null; - Session jmsSession = null; - try { - connection = JmsFactory.createConnection(context, clientId); - jmsSession = JmsFactory.createSession(context, connection, DEFAULT_IS_TRANSACTED); - - final String messageSelector = context.getProperty(MESSAGE_SELECTOR).getValue(); - final Topic topic = createTopic(context); - final MessageConsumer messageConsumer; - if (context.getProperty(DURABLE_SUBSCRIPTION).asBoolean()) { - messageConsumer = jmsSession.createDurableSubscriber(topic, clientId, messageSelector, false); - } else { - messageConsumer = jmsSession.createConsumer(topic, messageSelector, false); - } - - return new WrappedMessageConsumer(connection, jmsSession, messageConsumer); - } catch (JMSException e) { - if (jmsSession != null) { - jmsSession.close(); - } - if (connection != null) { - connection.close(); - } - throw e; - } - } - - private static Destination getDestination(final ProcessContext context) throws JMSException { - final String destinationType = context.getProperty(DESTINATION_TYPE).getValue(); - switch (destinationType) { - case DESTINATION_TYPE_TOPIC: - return createTopic(context); - case DESTINATION_TYPE_QUEUE: - default: - return createQueue(context); - } - } - - public static WrappedMessageProducer createMessageProducer(final ProcessContext context) throws JMSException { - return createMessageProducer(context, false); - } - - public static WrappedMessageProducer createMessageProducer(final ProcessContext context, final boolean transacted) throws JMSException { - Connection connection = null; - Session jmsSession = null; - - try { - connection = JmsFactory.createConnection(context); - jmsSession = JmsFactory.createSession(context, connection, transacted); - - final Destination destination = getDestination(context); - final MessageProducer messageProducer = jmsSession.createProducer(destination); - - return new WrappedMessageProducer(connection, jmsSession, messageProducer); - } catch (JMSException e) { - if (connection != null) { - connection.close(); - } - if (jmsSession != null) { - jmsSession.close(); - } - throw e; - } - } - - public static Destination createQueue(final ProcessContext context) { - return createQueue(context, context.getProperty(DESTINATION_NAME).getValue()); - } - - public static Queue createQueue(final ProcessContext context, final String queueName) { - return createQueue(context.getProperty(JMS_PROVIDER).getValue(), queueName); - } - - public static Queue createQueue(final String jmsProvider, final String queueName) { - switch (jmsProvider) { - case ACTIVEMQ_PROVIDER: - default: - return new ActiveMQQueue(queueName); - } - } - - private static Topic createTopic(final ProcessContext context) { - final String topicName = context.getProperty(DESTINATION_NAME).getValue(); - switch (context.getProperty(JMS_PROVIDER).getValue()) { - case ACTIVEMQ_PROVIDER: - default: - return new ActiveMQTopic(topicName); - } - } - - private static ConnectionFactory createConnectionFactory(final ProcessContext context) throws JMSException { - final URI uri; - try { - uri = new URI(context.getProperty(URL).getValue()); - } catch (URISyntaxException e) { - // Should not happen - URI was validated - throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e); - } - final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final String provider = context.getProperty(JMS_PROVIDER).getValue(); - if (isSSL(uri)) { - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if (sslContextService == null) { - throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set."); - } - return createSslConnectionFactory(uri, timeoutMillis, provider, sslContextService.getKeyStoreFile(), - sslContextService.getKeyStorePassword(), sslContextService.getTrustStoreFile(), sslContextService.getTrustStorePassword()); - } else { - return createConnectionFactory(uri, timeoutMillis, provider); - } - } - - private static boolean isSSL(URI uri) { - try { - CompositeData compositeData = URISupport.parseComposite(uri); - if ("ssl".equals(compositeData.getScheme())) { - return true; - } - for(URI component : compositeData.getComponents()){ - if ("ssl".equals(component.getScheme())) { - return true; - } - } - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Attempting to initiate JMS with invalid composite URI [" + uri + "]", e); - } - return false; - } - - public static ConnectionFactory createConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider) throws JMSException { - return createConnectionFactory(uri.toString(), timeoutMillis, jmsProvider); - } - - public static ConnectionFactory createConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider) throws JMSException { - switch (jmsProvider) { - case ACTIVEMQ_PROVIDER: { - final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); - factory.setSendTimeout(timeoutMillis); - return factory; - } - default: - throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider); - } - } - - public static ConnectionFactory createSslConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider, - final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException { - return createSslConnectionFactory(uri.toString(), timeoutMillis, jmsProvider, keystore, keystorePassword, truststore, truststorePassword); - } - - public static ConnectionFactory createSslConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider, - final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException { - switch (jmsProvider) { - case ACTIVEMQ_PROVIDER: { - final ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory(url); - try { - factory.setKeyStore(keystore); - } catch (Exception e) { - throw new JMSException("Problem Setting the KeyStore: " + e.getMessage()); - } - factory.setKeyStorePassword(keystorePassword); - try { - factory.setTrustStore(truststore); - } catch (Exception e) { - throw new JMSException("Problem Setting the TrustStore: " + e.getMessage()); - } - factory.setTrustStorePassword(truststorePassword); - factory.setSendTimeout(timeoutMillis); - return factory; - } - default: - throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider); - } - } - - public static Map createAttributeMap(final Message message) throws JMSException { - final Map attributes = new HashMap<>(); - - final Enumeration enumeration = message.getPropertyNames(); - while (enumeration.hasMoreElements()) { - final String propName = (String) enumeration.nextElement(); - - final Object value = message.getObjectProperty(propName); - - if (value == null) { - attributes.put(ATTRIBUTE_PREFIX + propName, ""); - attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, "Unknown"); - continue; - } - - final String valueString = value.toString(); - attributes.put(ATTRIBUTE_PREFIX + propName, valueString); - - final String propType; - if (value instanceof String) { - propType = PROP_TYPE_STRING; - } else if (value instanceof Double) { - propType = PROP_TYPE_DOUBLE; - } else if (value instanceof Float) { - propType = PROP_TYPE_FLOAT; - } else if (value instanceof Long) { - propType = PROP_TYPE_LONG; - } else if (value instanceof Integer) { - propType = PROP_TYPE_INTEGER; - } else if (value instanceof Short) { - propType = PROP_TYPE_SHORT; - } else if (value instanceof Byte) { - propType = PROP_TYPE_BYTE; - } else if (value instanceof Boolean) { - propType = PROP_TYPE_BOOLEAN; - } else { - propType = PROP_TYPE_OBJECT; - } - - attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, propType); - } - - if (message.getJMSCorrelationID() != null) { - attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID()); - } - if (message.getJMSDestination() != null) { - String destinationName; - if (message.getJMSDestination() instanceof Queue) { - destinationName = ((Queue) message.getJMSDestination()).getQueueName(); - } else { - destinationName = ((Topic) message.getJMSDestination()).getTopicName(); - } - attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName); - } - if (message.getJMSMessageID() != null) { - attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID()); - } - if (message.getJMSReplyTo() != null) { - attributes.put(ATTRIBUTE_PREFIX + JMS_REPLY_TO, message.getJMSReplyTo().toString()); - } - if (message.getJMSType() != null) { - attributes.put(ATTRIBUTE_PREFIX + JMS_TYPE, message.getJMSType()); - } - - attributes.put(ATTRIBUTE_PREFIX + JMS_DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode())); - attributes.put(ATTRIBUTE_PREFIX + JMS_EXPIRATION, String.valueOf(message.getJMSExpiration())); - attributes.put(ATTRIBUTE_PREFIX + JMS_PRIORITY, String.valueOf(message.getJMSPriority())); - attributes.put(ATTRIBUTE_PREFIX + JMS_REDELIVERED, String.valueOf(message.getJMSRedelivered())); - attributes.put(ATTRIBUTE_PREFIX + JMS_TIMESTAMP, String.valueOf(message.getJMSTimestamp())); - return attributes; - } -} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java deleted file mode 100644 index 5da67fefe4..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import javax.jms.Message; - -import org.apache.nifi.flowfile.FlowFile; - -/** - * Data structure which allows to collect processing summary data. - * - */ -public class JmsProcessingSummary { - - private int messagesReceived; - private long bytesReceived; - private Message lastMessageReceived; - private int flowFilesCreated; - private FlowFile lastFlowFile; // helps testing - - public JmsProcessingSummary() { - super(); - this.messagesReceived = 0; - this.bytesReceived = 0; - this.lastMessageReceived = null; - this.flowFilesCreated = 0; - this.lastFlowFile = null; - } - - public JmsProcessingSummary(long bytesReceived, Message lastMessageReceived, FlowFile lastFlowFile) { - super(); - this.messagesReceived = 1; - this.bytesReceived = bytesReceived; - this.lastMessageReceived = lastMessageReceived; - this.flowFilesCreated = 1; - this.lastFlowFile = lastFlowFile; - } - - public void add(JmsProcessingSummary jmsProcessingSummary) { - this.messagesReceived += jmsProcessingSummary.messagesReceived; - this.bytesReceived += jmsProcessingSummary.bytesReceived; - this.lastMessageReceived = jmsProcessingSummary.lastMessageReceived; - this.flowFilesCreated += jmsProcessingSummary.flowFilesCreated; - this.lastFlowFile = jmsProcessingSummary.lastFlowFile; - } - - public int getMessagesReceived() { - return messagesReceived; - } - - public long getBytesReceived() { - return bytesReceived; - } - - public Message getLastMessageReceived() { - return lastMessageReceived; - } - - public int getFlowFilesCreated() { - return flowFilesCreated; - } - - public FlowFile getLastFlowFile() { - return lastFlowFile; - } - -} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java deleted file mode 100644 index 7a3ee0add2..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.ssl.SSLContextService; - -public class JmsProperties { - - public static final String ACTIVEMQ_PROVIDER = "ActiveMQ"; - - public static final String ACK_MODE_CLIENT = "Client Acknowledge"; - public static final String ACK_MODE_AUTO = "Auto Acknowledge"; - - public static final String DESTINATION_TYPE_QUEUE = "Queue"; - public static final String DESTINATION_TYPE_TOPIC = "Topic"; - - public static final String MSG_TYPE_BYTE = "byte"; - public static final String MSG_TYPE_TEXT = "text"; - public static final String MSG_TYPE_STREAM = "stream"; - public static final String MSG_TYPE_MAP = "map"; - public static final String MSG_TYPE_EMPTY = "empty"; - - // Standard JMS Properties - public static final PropertyDescriptor JMS_PROVIDER = new PropertyDescriptor.Builder() - .name("JMS Provider") - .description("The Provider used for the JMS Server") - .required(true) - .allowableValues(ACTIVEMQ_PROVIDER) - .defaultValue(ACTIVEMQ_PROVIDER) - .build(); - public static final PropertyDescriptor URL = new PropertyDescriptor.Builder() - .name("URL") - .description("The URL of the JMS Server") - .addValidator(StandardValidators.URI_VALIDATOR) - .required(true) - .build(); - public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description("The amount of time to wait when attempting to receive a message before giving up and assuming failure") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("30 sec") - .build(); - public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() - .name("Username") - .description("Username used for authentication and authorization") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() - .name("Password") - .description("Password used for authentication and authorization") - .required(false) - .addValidator(Validator.VALID) - .sensitive(true) - .build(); - public static final PropertyDescriptor CLIENT_ID_PREFIX = new PropertyDescriptor.Builder() - .name("Client ID Prefix") - .description("A human-readable ID that can be used to associate connections with yourself so that the maintainers of the JMS Server know who to contact if problems arise") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - // Topic/Queue determination Properties - public static final PropertyDescriptor DESTINATION_NAME = new PropertyDescriptor.Builder() - .name("Destination Name") - .description("The name of the JMS Topic or queue to use") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder() - .name("Destination Type") - .description("The type of the JMS Destination to use") - .required(true) - .allowableValues(DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC) - .defaultValue(DESTINATION_TYPE_QUEUE) - .build(); - - public static final PropertyDescriptor DURABLE_SUBSCRIPTION = new PropertyDescriptor.Builder() - .name("Use Durable Subscription") - .description("If true, connections to the specified topic will use Durable Subscription so that messages are queued when we are not pulling them") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); - - // JMS Publisher Properties - public static final PropertyDescriptor ATTRIBUTES_TO_JMS_PROPS = new PropertyDescriptor.Builder() - .name("Copy Attributes to JMS Properties") - .description("Whether or not FlowFile Attributes should be translated into JMS Message Properties. If true, all " - + "attributes starting with 'jms.' will be set as Properties on the JMS Message (without the 'jms.' prefix). " - + "If an attribute exists that starts with the same value but ends in '.type', that attribute will be used " - + "to determine the JMS Message Property type.") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - - // JMS Listener Properties - public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Message Batch Size") - .description("The number of messages to pull/push in a single iteration of the processor") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10") - .build(); - public static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder() - .name("Acknowledgement Mode") - .description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide better performance than Client Acknowledge.") - .required(true) - .allowableValues(ACK_MODE_CLIENT, ACK_MODE_AUTO) - .defaultValue(ACK_MODE_CLIENT) - .build(); - public static final PropertyDescriptor JMS_PROPS_TO_ATTRIBUTES = new PropertyDescriptor.Builder() - .name("Copy JMS Properties to Attributes") - .description("Whether or not the JMS Message Properties should be copied to the FlowFile Attributes; if so, the attribute name will be jms.XXX, where XXX is the JMS Property name") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - public static final PropertyDescriptor MESSAGE_SELECTOR = new PropertyDescriptor.Builder() - .name("Message Selector") - .description("The JMS Message Selector to use in order to narrow the messages that are pulled") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - // JMS Producer Properties - public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder() - .name("Message Type") - .description("The Type of JMS Message to Construct") - .required(true) - .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY) - .defaultValue(MSG_TYPE_BYTE) - .build(); - public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder() - .name("Message Priority") - .description("The Priority of the Message") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - public static final PropertyDescriptor REPLY_TO_QUEUE = new PropertyDescriptor.Builder() - .name("Reply-To Queue") - .description("The name of the queue to which a reply to should be added") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - public static final PropertyDescriptor MESSAGE_TTL = new PropertyDescriptor.Builder() - .name("Message Time to Live") - .description("The amount of time that the message should live on the destination before being removed; if not specified, the message will never expire.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Buffer Size") - .description("The maximum amount of data that can be buffered for a JMS Message. If a FlowFile's size exceeds this value, the FlowFile will be routed to failure.") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .build(); - - // JMS SSL Properties - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("The Controller Service to use in order to obtain an SSL Context.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); -} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java deleted file mode 100644 index 80806df8c9..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.io.nio.BufferPool; -import org.apache.nifi.io.nio.consumer.StreamConsumer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.OutputStreamCallback; - -/** - * - */ -public class UDPStreamConsumer implements StreamConsumer { - - private final ComponentLog logger; - final List newFlowFileQueue; - private final String uniqueId; - private BufferPool bufferPool = null; - private final BlockingQueue filledBuffers = new LinkedBlockingQueue<>(); - private final AtomicBoolean streamEnded = new AtomicBoolean(false); - private final AtomicBoolean consumerDone = new AtomicBoolean(false); - private ProcessSession session; - private final UDPConsumerCallback udpCallback; - - public UDPStreamConsumer(final String streamId, final List newFlowFiles, final long fileSizeTrigger, final ComponentLog logger, - final boolean flowFilePerDatagram) { - this.uniqueId = streamId; - this.newFlowFileQueue = newFlowFiles; - this.logger = logger; - this.udpCallback = new UDPConsumerCallback(filledBuffers, fileSizeTrigger, flowFilePerDatagram); - } - - @Override - public void setReturnBufferQueue(final BufferPool pool) { - this.bufferPool = pool; - this.udpCallback.setBufferPool(pool); - } - - @Override - public void addFilledBuffer(final ByteBuffer buffer) { - if (isConsumerFinished()) { - bufferPool.returnBuffer(buffer, 0); - } else { - filledBuffers.add(buffer); - } - } - - private void close() { - if (isConsumerFinished()) { - return; - } - consumerDone.set(true); - ByteBuffer buf = null; - while ((buf = filledBuffers.poll()) != null) { - bufferPool.returnBuffer(buf, 0); - } - } - - public void setSession(ProcessSession session) { - this.session = session; - } - - @Override - public void process() throws IOException { - if (isConsumerFinished()) { - return; - } - - FlowFile newFlowFile = null; - try { - if (streamEnded.get() && filledBuffers.isEmpty()) { - close(); - return; - } - if (filledBuffers.isEmpty()) { - return; - } - // time to make a new flow file - newFlowFile = session.create(); - newFlowFile = session.putAttribute(newFlowFile, "source.stream.identifier", uniqueId); - newFlowFile = session.write(newFlowFile, udpCallback); - if (newFlowFile.getSize() == 0) { - session.remove(newFlowFile); - return; - } - newFlowFileQueue.add(newFlowFile); - } catch (final Exception ex) { - close(); - if (newFlowFile != null) { - try { - session.remove(newFlowFile); - } catch (final Exception ex2) { - logger.warn("Unable to delete partial flow file due to: ", ex2); - } - } - throw new IOException("Problem while processing data stream", ex); - } - } - - @Override - public void signalEndOfStream() { - streamEnded.set(true); - } - - @Override - public boolean isConsumerFinished() { - return consumerDone.get(); - } - - @Override - public String getId() { - return uniqueId; - } - - @Override - public final boolean equals(final Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { - return true; - } - if (obj.getClass() != getClass()) { - return false; - } - UDPStreamConsumer rhs = (UDPStreamConsumer) obj; - return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals(); - } - - @Override - public final int hashCode() { - return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode(); - } - - @Override - public final String toString() { - return new ToStringBuilder(this).append(uniqueId).toString(); - } - - public static final class UDPConsumerCallback implements OutputStreamCallback { - - BufferPool bufferPool; - final BlockingQueue filledBuffers; - final long fileSizeTrigger; - final boolean flowFilePerDatagram; - - public UDPConsumerCallback(final BlockingQueue filledBuffers, final long fileSizeTrigger, final boolean flowFilePerDatagram) { - this.filledBuffers = filledBuffers; - this.fileSizeTrigger = fileSizeTrigger; - this.flowFilePerDatagram = flowFilePerDatagram; - } - - public void setBufferPool(BufferPool pool) { - this.bufferPool = pool; - } - - @Override - public void process(final OutputStream out) throws IOException { - try { - long totalBytes = 0L; - try (WritableByteChannel wbc = Channels.newChannel(new BufferedOutputStream(out))) { - ByteBuffer buffer = null; - while ((buffer = filledBuffers.poll(50, TimeUnit.MILLISECONDS)) != null) { - int bytesWrittenThisPass = 0; - try { - while (buffer.hasRemaining()) { - bytesWrittenThisPass += wbc.write(buffer); - } - totalBytes += bytesWrittenThisPass; - if (totalBytes > fileSizeTrigger || flowFilePerDatagram) { - break;// this is enough data - } - } finally { - bufferPool.returnBuffer(buffer, bytesWrittenThisPass); - } - } - } - } catch (final InterruptedException ie) { - // irrelevant - } - } - - } -} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java deleted file mode 100644 index cc33184807..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.nifi.logging.ComponentLog; - -public class WrappedMessageConsumer { - - private final Connection connection; - private final Session session; - private final MessageConsumer consumer; - - private boolean closed = false; - - public WrappedMessageConsumer(final Connection connection, final Session jmsSession, final MessageConsumer messageConsumer) { - this.connection = connection; - this.session = jmsSession; - this.consumer = messageConsumer; - } - - public Connection getConnection() { - return connection; - } - - public Session getSession() { - return session; - } - - public MessageConsumer getConsumer() { - return consumer; - } - - public void close(final ComponentLog logger) { - closed = true; - - try { - connection.close(); - } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); - } - - try { - session.close(); - } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); - } - - try { - consumer.close(); - } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); - } - } - - public boolean isClosed() { - return closed; - } -} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java deleted file mode 100644 index 49e586da85..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.nifi.logging.ComponentLog; - -public class WrappedMessageProducer { - - private final Connection connection; - private final Session session; - private final MessageProducer producer; - - private boolean closed = false; - - public WrappedMessageProducer(final Connection connection, final Session jmsSession, final MessageProducer messageProducer) { - this.connection = connection; - this.session = jmsSession; - this.producer = messageProducer; - } - - public Connection getConnection() { - return connection; - } - - public Session getSession() { - return session; - } - - public MessageProducer getProducer() { - return producer; - } - - public void close(final ComponentLog logger) { - closed = true; - - try { - connection.close(); - } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); - } - - try { - session.close(); - } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); - } - - try { - producer.close(); - } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); - } - } - - public boolean isClosed() { - return closed; - } -} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java index 9b4a6ef58f..5429bf3f43 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.standard; -import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; @@ -26,6 +25,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; +import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml index db389fba2e..4de0a512c0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml @@ -176,17 +176,6 @@ javax.jms-api 2.0.1 - - org.apache.activemq - activemq-client - 5.15.15 - - - org.apache.activemq - activemq-broker - 5.15.14 - test - org.apache.derby derby