From 39556e35131638bfb5795dd41736c0faae8aaf39 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 21 Feb 2018 09:31:36 -0500 Subject: [PATCH] NIFI-4902: This closes #2485. Updated ConsumeAMQP, PublishAMQP to use one connection per concurrent task instead of a single connection shared by all concurrent tasks. This offers far better throughput when the network latency is non-trivial. Also refactored to simplify code Signed-off-by: joewitt --- .../nifi-amqp-processors/pom.xml | 2 +- .../nifi/amqp/processors/AMQPConsumer.java | 11 +- .../nifi/amqp/processors/AMQPPublisher.java | 36 +- .../nifi/amqp/processors/AMQPResource.java | 70 ++++ .../nifi/amqp/processors/AMQPUtils.java | 240 ----------- .../nifi/amqp/processors/AMQPWorker.java | 18 +- .../processors/AbstractAMQPProcessor.java | 136 +++---- .../nifi/amqp/processors/ConsumeAMQP.java | 137 ++++--- .../nifi/amqp/processors/PublishAMQP.java | 243 ++++++----- .../amqp/processors/AMQPPublisherTest.java | 4 +- .../nifi/amqp/processors/AMQPUtilsTest.java | 52 --- .../processors/AbstractAMQPProcessorTest.java | 6 +- .../nifi/amqp/processors/ConsumeAMQPTest.java | 32 +- .../nifi/amqp/processors/PublishAMQPTest.java | 65 +-- .../nifi/amqp/processors/TestChannel.java | 376 ++++++++++-------- .../nifi/amqp/processors/TestConnection.java | 93 +++-- 16 files changed, 633 insertions(+), 888 deletions(-) create mode 100644 nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java delete mode 100644 nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java delete mode 100644 nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml index 0511323b07..61f3facde2 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml @@ -20,7 +20,7 @@ language governing permissions and limitations under the License. --> jar - 3.6.0 + 5.2.0 diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java index 96d5385f96..04664698c0 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java @@ -33,14 +33,8 @@ import com.rabbitmq.client.GetResponse; final class AMQPConsumer extends AMQPWorker { private final static Logger logger = LoggerFactory.getLogger(AMQPConsumer.class); - private final String queueName; - /** - * Creates an instance of this consumer - * @param connection instance of AMQP {@link Connection} - * @param queueName name of the queue from which messages will be consumed. - */ AMQPConsumer(Connection connection, String queueName) { super(connection); this.validateStringProperty("queueName", queueName); @@ -60,7 +54,7 @@ final class AMQPConsumer extends AMQPWorker { */ public GetResponse consume() { try { - return this.channel.basicGet(this.queueName, true); + return getChannel().basicGet(this.queueName, true); } catch (IOException e) { logger.error("Failed to receive message from AMQP; " + this + ". Possible reasons: Queue '" + this.queueName + "' may not have been defined", e); @@ -68,9 +62,6 @@ final class AMQPConsumer extends AMQPWorker { } } - /** - * - */ @Override public String toString() { return super.toString() + ", QUEUE:" + this.queueName; diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java index 6f2fddc41d..553fc835cc 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.nifi.logging.ComponentLog; import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ReturnListener; @@ -31,19 +32,17 @@ import com.rabbitmq.client.ReturnListener; final class AMQPPublisher extends AMQPWorker { private final ComponentLog processLog; - private final String connectionString; /** * Creates an instance of this publisher * - * @param connection - * instance of AMQP {@link Connection} + * @param connection instance of AMQP {@link Connection} */ AMQPPublisher(Connection connection, ComponentLog processLog) { super(connection); this.processLog = processLog; - this.channel.addReturnListener(new UndeliverableMessageLogger()); + getChannel().addReturnListener(new UndeliverableMessageLogger()); this.connectionString = connection.toString(); } @@ -51,15 +50,11 @@ final class AMQPPublisher extends AMQPWorker { * Publishes message with provided AMQP properties (see * {@link BasicProperties}) to a pre-defined AMQP Exchange. * - * @param bytes - * bytes representing a message. - * @param properties - * instance of {@link BasicProperties} - * @param exchange - * the name of AMQP exchange to which messages will be published. + * @param bytes bytes representing a message. + * @param properties instance of {@link BasicProperties} + * @param exchange the name of AMQP exchange to which messages will be published. * If not provided 'default' exchange will be used. - * @param routingKey - * (required) the name of the routingKey to be used by AMQP-based + * @param routingKey (required) the name of the routingKey to be used by AMQP-based * system to route messages to its final destination (queue). */ void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) { @@ -71,22 +66,18 @@ final class AMQPPublisher extends AMQPWorker { processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange + "' exchange with '" + routingKey + "' as a routing key."); - if (this.channel.isOpen()) { + final Channel channel = getChannel(); + if (channel.isOpen()) { try { - this.channel.basicPublish(exchange, routingKey, true, properties, bytes); + channel.basicPublish(exchange, routingKey, true, properties, bytes); } catch (Exception e) { - throw new IllegalStateException("Failed to publish to '" + - exchange + "' with '" + routingKey + "'.", e); + throw new IllegalStateException("Failed to publish to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e); } } else { - throw new IllegalStateException("This instance of AMQPPublisher is invalid since " - + "its publishingChannel is closed"); + throw new IllegalStateException("This instance of AMQPPublisher is invalid since its publishingChannel is closed"); } } - /** - * - */ @Override public String toString() { return this.connectionString; @@ -106,8 +97,7 @@ final class AMQPPublisher extends AMQPWorker { */ private final class UndeliverableMessageLogger implements ReturnListener { @Override - public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) - throws IOException { + public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) throws IOException { String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey + "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + "."; processLog.warn(logMessage); diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java new file mode 100644 index 0000000000..2319e7a90d --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.amqp.processors; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Connection; + +public class AMQPResource implements Closeable { + private final Connection connection; + private final T worker; + + public AMQPResource(final Connection connection, final T worker) { + this.connection = connection; + this.worker = worker; + } + + public Connection getConnection() { + return connection; + } + + public T getWorker() { + return worker; + } + + @Override + public void close() throws IOException { + IOException ioe = null; + + try { + worker.close(); + } catch (final IOException e) { + ioe = e; + } catch (final TimeoutException e) { + ioe = new IOException(e); + } + + try { + connection.close(); + } catch (final IOException e) { + if (ioe == null) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + + if (ioe != null) { + throw ioe; + } + } + +} diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java deleted file mode 100644 index 68302a2ed3..0000000000 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.amqp.processors; - -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.rabbitmq.client.AMQP.BasicProperties; - -/** - * Utility helper class simplify interactions with target AMQP API and NIFI API. - */ -abstract class AMQPUtils { - - public final static String AMQP_PROP_DELIMITER = "$"; - - public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER; - - private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class); - - public enum PropertyNames { - CONTENT_TYPE(AMQP_PROP_PREFIX + "contentType"), - CONTENT_ENCODING(AMQP_PROP_PREFIX + "contentEncoding"), - HEADERS(AMQP_PROP_PREFIX + "headers"), - DELIVERY_MODE(AMQP_PROP_PREFIX + "deliveryMode"), - PRIORITY(AMQP_PROP_PREFIX + "priority"), - CORRELATION_ID(AMQP_PROP_PREFIX + "correlationId"), - REPLY_TO(AMQP_PROP_PREFIX + "replyTo"), - EXPIRATION(AMQP_PROP_PREFIX + "expiration"), - MESSAGE_ID(AMQP_PROP_PREFIX + "messageId"), - TIMESTAMP(AMQP_PROP_PREFIX + "timestamp"), - TYPE(AMQP_PROP_PREFIX + "type"), - USER_ID(AMQP_PROP_PREFIX + "userId"), - APP_ID(AMQP_PROP_PREFIX + "appId"), - CLUSTER_ID(AMQP_PROP_PREFIX + "clusterId"); - - PropertyNames(String value) { - this.value = value; - } - - private final String value; - - private static final Map lookup = new HashMap<>(); - - public static PropertyNames fromValue(String s) { - return lookup.get(s); - } - - static { - for (PropertyNames propertyNames : PropertyNames.values()) { - lookup.put(propertyNames.getValue(), propertyNames); - } - } - - public String getValue() { - return value; - } - - @Override - public String toString() { - return value; - } - } - - /** - * Updates {@link FlowFile} with attributes representing AMQP properties - * - * @param amqpProperties instance of {@link BasicProperties} - * @param flowFile instance of target {@link FlowFile} - * @param processSession instance of {@link ProcessSession} - */ - public static FlowFile updateFlowFileAttributesWithAmqpProperties(BasicProperties amqpProperties, FlowFile flowFile, ProcessSession processSession) { - if (amqpProperties != null) { - try { - Method[] methods = BasicProperties.class.getDeclaredMethods(); - Map attributes = new HashMap<>(); - for (Method method : methods) { - if (Modifier.isPublic(method.getModifiers()) && method.getName().startsWith("get")) { - Object amqpPropertyValue = method.invoke(amqpProperties); - if (amqpPropertyValue != null) { - String propertyName = extractPropertyNameFromMethod(method); - if (isValidAmqpPropertyName(propertyName)) { - if (propertyName.equals(PropertyNames.CONTENT_TYPE.getValue())) { - attributes.put(CoreAttributes.MIME_TYPE.key(), amqpPropertyValue.toString()); - } - attributes.put(propertyName, amqpPropertyValue.toString()); - } - } - } - } - flowFile = processSession.putAllAttributes(flowFile, attributes); - } catch (Exception e) { - logger.warn("Failed to update FlowFile with AMQP attributes", e); - } - } - return flowFile; - } - - /** - * Will validate if provided name corresponds to valid AMQP property. - * - * @param name the name of the property - * @return 'true' if valid otherwise 'false' - */ - public static boolean isValidAmqpPropertyName(String name) { - return PropertyNames.fromValue(name) != null; - } - - /** - * - */ - private static String extractPropertyNameFromMethod(Method method) { - char c[] = method.getName().substring(3).toCharArray(); - c[0] = Character.toLowerCase(c[0]); - return AMQP_PROP_PREFIX + new String(c); - } - - /** - * Will validate if provided amqpPropValue can be converted to a {@link Map}. - * Should be passed in the format: amqp$headers=key=value,key=value etc. - * - * @param amqpPropValue the value of the property - * @return {@link Map} if valid otherwise null - */ - public static Map validateAMQPHeaderProperty(String amqpPropValue) { - String[] strEntries = amqpPropValue.split(","); - Map headers = new HashMap<>(); - for (String strEntry : strEntries) { - String[] kv = strEntry.split("="); - if (kv.length == 2) { - headers.put(kv[0].trim(), kv[1].trim()); - } else { - logger.warn("Malformed key value pair for AMQP header property: " + amqpPropValue); - } - } - - return headers; - } - - /** - * Will validate if provided amqpPropValue can be converted to an {@link Integer}, and that its - * value is 1 or 2. - * - * @param amqpPropValue the value of the property - * @return {@link Integer} if valid otherwise null - */ - public static Integer validateAMQPDeliveryModeProperty(String amqpPropValue) { - Integer deliveryMode = toInt(amqpPropValue); - - if (deliveryMode == null || !(deliveryMode == 1 || deliveryMode == 2)) { - logger.warn("Invalid value for AMQP deliveryMode property: " + amqpPropValue); - } - return deliveryMode; - } - - /** - * Will validate if provided amqpPropValue can be converted to an {@link Integer} and that its - * value is between 0 and 9 (inclusive). - * - * @param amqpPropValue the value of the property - * @return {@link Integer} if valid otherwise null - */ - public static Integer validateAMQPPriorityProperty(String amqpPropValue) { - Integer priority = toInt(amqpPropValue); - - if (priority == null || !(priority >= 0 && priority <= 9)) { - logger.warn("Invalid value for AMQP priority property: " + amqpPropValue); - } - return priority; - } - - /** - * Will validate if provided amqpPropValue can be converted to a {@link Date}. - * - * @param amqpPropValue the value of the property - * @return {@link Date} if valid otherwise null - */ - public static Date validateAMQPTimestampProperty(String amqpPropValue) { - Long timestamp = toLong(amqpPropValue); - - if (timestamp == null) { - logger.warn("Invalid value for AMQP timestamp property: " + amqpPropValue); - return null; - } - - //milliseconds are lost when sending to AMQP - return new Date(timestamp); - } - - /** - * Takes a {@link String} and tries to convert to an {@link Integer}. - * - * @param strVal the value to be converted - * @return {@link Integer} if valid otherwise null - */ - private static Integer toInt(String strVal) { - try { - return Integer.parseInt(strVal); - } catch (NumberFormatException aE) { - return null; - } - } - - /** - * Takes a {@link String} and tries to convert to a {@link Long}. - * - * @param strVal the value to be converted - * @return {@link Long} if valid otherwise null - */ - private static Long toLong(String strVal) { - try { - return Long.parseLong(strVal); - } catch (NumberFormatException aE) { - return null; - } - } -} diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java index a4de05ecb6..990ed0b4bf 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java @@ -34,19 +34,18 @@ import com.rabbitmq.client.Connection; abstract class AMQPWorker implements AutoCloseable { private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class); - - protected final Channel channel; + private final Channel channel; /** * Creates an instance of this worker initializing it with AMQP * {@link Connection} and creating a target {@link Channel} used by * sub-classes to interact with AMQP-based messaging system. * - * @param connection - * instance of {@link Connection} + * @param connection instance of {@link Connection} */ - public AMQPWorker(Connection connection) { - this.validateConnection(connection); + public AMQPWorker(final Connection connection) { + validateConnection(connection); + try { this.channel = connection.createChannel(); } catch (IOException e) { @@ -55,6 +54,10 @@ abstract class AMQPWorker implements AutoCloseable { } } + protected Channel getChannel() { + return channel; + } + /** * Closes {@link Channel} created when instance of this class was created. */ @@ -91,8 +94,7 @@ abstract class AMQPWorker implements AutoCloseable { /** * Validates that {@link Connection} is not null and open. * - * @param connection - * instance of {@link Connection} + * @param connection instance of {@link Connection} */ private void validateConnection(Connection connection) { if (connection == null) { diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java index efd1be55ed..3e5528325e 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java @@ -16,9 +16,11 @@ */ package org.apache.nifi.amqp.processors; -import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import javax.net.ssl.SSLContext; @@ -29,7 +31,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.util.SslContextFactory; @@ -119,112 +120,95 @@ abstract class AbstractAMQPProcessor extends AbstractProce .defaultValue("REQUIRED") .build(); - static List descriptors = new ArrayList<>(); + private static final List propertyDescriptors; - /* - * Will ensure that list of PropertyDescriptors is build only once, since - * all other lifecycle methods are invoked multiple times. - */ static { - descriptors.add(HOST); - descriptors.add(PORT); - descriptors.add(V_HOST); - descriptors.add(USER); - descriptors.add(PASSWORD); - descriptors.add(AMQP_VERSION); - descriptors.add(SSL_CONTEXT_SERVICE); - descriptors.add(USE_CERT_AUTHENTICATION); - descriptors.add(CLIENT_AUTH); + final List properties = new ArrayList<>(); + properties.add(HOST); + properties.add(PORT); + properties.add(V_HOST); + properties.add(USER); + properties.add(PASSWORD); + properties.add(AMQP_VERSION); + properties.add(SSL_CONTEXT_SERVICE); + properties.add(USE_CERT_AUTHENTICATION); + properties.add(CLIENT_AUTH); + propertyDescriptors = Collections.unmodifiableList(properties); } - protected volatile Connection amqpConnection; + protected static List getCommonPropertyDescriptors() { + return propertyDescriptors; + } - protected volatile T targetResource; + private final BlockingQueue> resourceQueue = new LinkedBlockingQueue<>(); /** - * Will builds target resource ({@link AMQPPublisher} or - * {@link AMQPConsumer}) upon first invocation and will delegate to the - * implementation of - * {@link #rendezvousWithAmqp(ProcessContext, ProcessSession)} method for - * further processing. + * Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the + * implementation of {@link #processResource(ProcessContext, ProcessSession)} method for further processing. */ @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - synchronized (this) { - this.buildTargetResource(context); + public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + AMQPResource resource = resourceQueue.poll(); + if (resource == null) { + resource = createResource(context); + } + + try { + processResource(resource.getConnection(), resource.getWorker(), context, session); + resourceQueue.offer(resource); + } catch (final Exception e) { + try { + resource.close(); + } catch (final Exception e2) { + e.addSuppressed(e2); + } + + throw e; } - this.rendezvousWithAmqp(context, session); } - /** - * Will close current AMQP connection. - */ + @OnStopped public void close() { - try { - if (this.targetResource != null) { - this.targetResource.close(); + AMQPResource resource; + while ((resource = resourceQueue.poll()) != null) { + try { + resource.close(); + } catch (final Exception e) { + getLogger().warn("Failed to close AMQP Connection", e); } - } catch (Exception e) { - this.getLogger().warn("Failure while closing target resource " + this.targetResource, e); } - try { - if (this.amqpConnection != null) { - this.amqpConnection.close(); - } - } catch (IOException e) { - this.getLogger().warn("Failure while closing connection", e); - } - this.amqpConnection = null; } /** - * Delegate method to supplement - * {@link #onTrigger(ProcessContext, ProcessSession)}. It is implemented by - * sub-classes to perform {@link Processor} specific functionality. - * - * @param context - * instance of {@link ProcessContext} - * @param session - * instance of {@link ProcessSession} + * Performs functionality of the Processor, given the appropriate connection and worker */ - protected abstract void rendezvousWithAmqp(ProcessContext context, ProcessSession session) throws ProcessException; + protected abstract void processResource(Connection connection, T worker, ProcessContext context, ProcessSession session) throws ProcessException; /** - * Delegate method to supplement building of target {@link AMQPWorker} (see - * {@link AMQPPublisher} or {@link AMQPConsumer}) and is implemented by - * sub-classes. + * Builds the appropriate AMQP Worker for the implementing processor * - * @param context - * instance of {@link ProcessContext} + * @param context instance of {@link ProcessContext} * @return new instance of {@link AMQPWorker} */ - protected abstract T finishBuildingTargetResource(ProcessContext context); + protected abstract T createAMQPWorker(ProcessContext context, Connection connection); - /** - * Builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}). - * It does so by making a {@link Connection} and then delegating to the - * implementation of {@link #finishBuildingTargetResource(ProcessContext)} - * which will build {@link AMQPWorker} (see {@link AMQPPublisher} or - * {@link AMQPConsumer}). - */ - private void buildTargetResource(ProcessContext context) { - if (this.amqpConnection == null || !this.amqpConnection.isOpen()) { - this.amqpConnection = this.createConnection(context); - this.targetResource = this.finishBuildingTargetResource(context); - } + + private AMQPResource createResource(final ProcessContext context) { + final Connection connection = createConnection(context); + final T worker = createAMQPWorker(context, connection); + return new AMQPResource<>(connection, worker); } - /** - * Creates {@link Connection} to AMQP system. - */ - private Connection createConnection(ProcessContext context) { - ConnectionFactory cf = new ConnectionFactory(); + + protected Connection createConnection(ProcessContext context) { + final ConnectionFactory cf = new ConnectionFactory(); cf.setHost(context.getProperty(HOST).getValue()); cf.setPort(Integer.parseInt(context.getProperty(PORT).getValue())); cf.setUsername(context.getProperty(USER).getValue()); cf.setPassword(context.getProperty(PASSWORD).getValue()); - String vHost = context.getProperty(V_HOST).getValue(); + + final String vHost = context.getProperty(V_HOST).getValue(); if (vHost != null) { cf.setVirtualHost(vHost); } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java index db1f29a373..1b0ee525eb 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java @@ -16,16 +16,18 @@ */ package org.apache.nifi.amqp.processors; -import java.io.IOException; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -33,24 +35,34 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Connection; import com.rabbitmq.client.GetResponse; -/** - * Consuming AMQP processor which upon each invocation of - * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a - * {@link FlowFile} containing the body of the consumed AMQP message and AMQP - * properties that came with message which are added to a {@link FlowFile} as - * attributes. - */ @Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Consumes AMQP Message transforming its content to a FlowFile and transitioning it to 'success' relationship") +@CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be " + + "emitted as its own FlowFile to the 'success' relationship.") +@WritesAttributes({ + @WritesAttribute(attribute = "amqp$appId", description = "The App ID field from the AMQP Message"), + @WritesAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding reported by the AMQP Message"), + @WritesAttribute(attribute = "amqp$contentType", description = "The Content Type reported by the AMQP Message"), + @WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message"), + @WritesAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"), + @WritesAttribute(attribute = "amqp$priority", description = "The Message priority"), + @WritesAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"), + @WritesAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"), + @WritesAttribute(attribute = "amqp$expiration", description = "The Message Expiration"), + @WritesAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"), + @WritesAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"), + @WritesAttribute(attribute = "amqp$type", description = "The type of message"), + @WritesAttribute(attribute = "amqp$userId", description = "The ID of the user"), + @WritesAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"), +}) public class ConsumeAMQP extends AbstractAMQPProcessor { + private static final String ATTRIBUTES_PREFIX = "amqp$"; public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() .name("Queue") @@ -64,73 +76,82 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .description("All FlowFiles that are received from the AMQP queue are routed to this relationship") .build(); - private final static List propertyDescriptors; + private static final List propertyDescriptors; + private static final Set relationships; - private final static Set relationships; - - /* - * Will ensure that the list of property descriptors is build only once. - * Will also create a Set of relationships - */ static { - List _propertyDescriptors = new ArrayList<>(); - _propertyDescriptors.add(QUEUE); - _propertyDescriptors.addAll(descriptors); - propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + List properties = new ArrayList<>(); + properties.add(QUEUE); + properties.addAll(getCommonPropertyDescriptors()); + propertyDescriptors = Collections.unmodifiableList(properties); - Set _relationships = new HashSet<>(); - _relationships.add(REL_SUCCESS); - relationships = Collections.unmodifiableSet(_relationships); + Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(rels); } /** - * Will construct a {@link FlowFile} containing the body of the consumed - * AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is - * not null) and AMQP properties that came with message which are added to a - * {@link FlowFile} as attributes, transferring {@link FlowFile} to + * Will construct a {@link FlowFile} containing the body of the consumed AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is + * not null) and AMQP properties that came with message which are added to a {@link FlowFile} as attributes, transferring {@link FlowFile} to * 'success' {@link Relationship}. */ @Override - protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException { - final GetResponse response = this.targetResource.consume(); - if (response != null){ - FlowFile flowFile = processSession.create(); - flowFile = processSession.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(response.getBody()); - } - }); - BasicProperties amqpProperties = response.getProps(); - flowFile = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, flowFile, processSession); - processSession.getProvenanceReporter().receive(flowFile, - this.amqpConnection.toString() + "/" + context.getProperty(QUEUE).getValue()); - processSession.transfer(flowFile, REL_SUCCESS); - } else { + protected void processResource(final Connection connection, final AMQPConsumer consumer, final ProcessContext context, final ProcessSession session) { + final GetResponse response = consumer.consume(); + if (response == null) { context.yield(); + return; } + + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, out -> out.write(response.getBody())); + + final BasicProperties amqpProperties = response.getProps(); + final Map attributes = buildAttributes(amqpProperties); + flowFile = session.putAllAttributes(flowFile, attributes); + + session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue()); + session.transfer(flowFile, REL_SUCCESS); + } + + private Map buildAttributes(final BasicProperties properties) { + final Map attributes = new HashMap<>(); + addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", properties.getHeaders()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "replyTo", properties.getReplyTo()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "expiration", properties.getExpiration()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "messageId", properties.getMessageId()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "timestamp", properties.getTimestamp() == null ? null : properties.getTimestamp().getTime()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "type", properties.getType()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", properties.getUserId()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", properties.getClusterId()); + return attributes; + } + + private void addAttribute(final Map attributes, final String attributeName, final Object value) { + if (value == null) { + return; + } + + attributes.put(attributeName, value.toString()); } - /** - * Will create an instance of {@link AMQPConsumer} - */ @Override - protected AMQPConsumer finishBuildingTargetResource(ProcessContext context) { - String queueName = context.getProperty(QUEUE).getValue(); - return new AMQPConsumer(this.amqpConnection, queueName); + protected AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) { + final String queueName = context.getProperty(QUEUE).getValue(); + return new AMQPConsumer(connection, queueName); } - /** - * - */ @Override protected List getSupportedPropertyDescriptors() { return propertyDescriptors; } - /** - * - */ @Override public Set getRelationships() { return relationships; diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java index 857e591eca..7dce05eaf6 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java @@ -20,16 +20,20 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; +import java.util.function.Consumer; import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -45,27 +49,34 @@ import org.apache.nifi.stream.io.StreamUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Connection; -/** - * Publishing AMQP processor which upon each invocation of - * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an - * AMQP message sending it to an exchange identified during construction of this - * class while transferring the incoming {@link FlowFile} to 'success' - * {@link Relationship}. - * - * Expects that queues, exchanges and bindings are pre-defined by an AMQP - * administrator - */ @Tags({ "amqp", "rabbit", "put", "message", "send", "publish" }) @InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Exchange." - + "In a typical AMQP exchange model, the message that is sent to the AMQP Exchange will be routed based on the 'Routing Key' " - + "to its final destination in the queue (the binding). If due to some misconfiguration the binding between the Exchange, Routing Key " - + "and Queue is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue). If " - + "that happens you will see a log in both app-log and bulletin stating to that effect. Fixing the binding " - + "(normally done by AMQP administrator) will resolve the issue.") +@CapabilityDescription("Creates an AMQP Message from the contents of a FlowFile and sends the message to an AMQP Exchange. " + + "In a typical AMQP exchange model, the message that is sent to the AMQP Exchange will be routed based on the 'Routing Key' " + + "to its final destination in the queue (the binding). If due to some misconfiguration the binding between the Exchange, Routing Key " + + "and Queue is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue). If " + + "that happens you will see a log in both app-log and bulletin stating to that effect, and the FlowFile will be routed to the 'failure' relationship.") @SystemResourceConsideration(resource = SystemResource.MEMORY) +@ReadsAttributes({ + @ReadsAttribute(attribute = "amqp$appId", description = "The App ID field to set on the AMQP Message"), + @ReadsAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding to set on the AMQP Message"), + @ReadsAttribute(attribute = "amqp$contentType", description = "The Content Type to set on the AMQP Message"), + @ReadsAttribute(attribute = "amqp$headers", description = "The headers to set on the AMQP Message"), + @ReadsAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"), + @ReadsAttribute(attribute = "amqp$priority", description = "The Message priority"), + @ReadsAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"), + @ReadsAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"), + @ReadsAttribute(attribute = "amqp$expiration", description = "The Message Expiration"), + @ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"), + @ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"), + @ReadsAttribute(attribute = "amqp$type", description = "The type of message"), + @ReadsAttribute(attribute = "amqp$userId", description = "The ID of the user"), + @ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"), +}) public class PublishAMQP extends AbstractAMQPProcessor { + private static final String ATTRIBUTES_PREFIX = "amqp$"; public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder() .name("Exchange Name") @@ -100,84 +111,71 @@ public class PublishAMQP extends AbstractAMQPProcessor { private final static Set relationships; - /* - * Will ensure that the list of property descriptors is build only once. - * Will also create a Set of relationships - */ static { - List _propertyDescriptors = new ArrayList<>(); - _propertyDescriptors.add(EXCHANGE); - _propertyDescriptors.add(ROUTING_KEY); - _propertyDescriptors.addAll(descriptors); - propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + List properties = new ArrayList<>(); + properties.add(EXCHANGE); + properties.add(ROUTING_KEY); + properties.addAll(getCommonPropertyDescriptors()); + propertyDescriptors = Collections.unmodifiableList(properties); - Set _relationships = new HashSet<>(); - _relationships.add(REL_SUCCESS); - _relationships.add(REL_FAILURE); - relationships = Collections.unmodifiableSet(_relationships); + Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); } + /** - * Will construct AMQP message by extracting its body from the incoming - * {@link FlowFile}. AMQP Properties will be extracted from the - * {@link FlowFile} and converted to {@link BasicProperties} to be sent - * along with the message. Upon success the incoming {@link FlowFile} is - * transferred to 'success' {@link Relationship} and upon failure FlowFile is - * penalized and transferred to the 'failure' {@link Relationship} + * Will construct AMQP message by extracting its body from the incoming {@link FlowFile}. AMQP Properties will be extracted from the + * {@link FlowFile} and converted to {@link BasicProperties} to be sent along with the message. Upon success the incoming {@link FlowFile} is + * transferred to 'success' {@link Relationship} and upon failure FlowFile is penalized and transferred to the 'failure' {@link Relationship} *
+ * * NOTE: Attributes extracted from {@link FlowFile} are considered * candidates for AMQP properties if their names are prefixed with * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml) - * */ @Override - protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException { - FlowFile flowFile = processSession.get(); - if (flowFile != null) { - BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile); - String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue(); - if (routingKey == null){ - throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '" - + context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile."); - } - String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue(); + protected void processResource(final Connection connection, final AMQPPublisher publisher, ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } - byte[] messageContent = this.extractMessage(flowFile, processSession); + final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile); + final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue(); + if (routingKey == null) { + throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '" + + context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile."); + } - try { - this.targetResource.publish(messageContent, amqpProperties, routingKey, exchange); - processSession.transfer(flowFile, REL_SUCCESS); - processSession.getProvenanceReporter().send(flowFile, this.amqpConnection.toString() + "/E:" + exchange + "/RK:" + routingKey); - } catch (Exception e) { - processSession.transfer(processSession.penalize(flowFile), REL_FAILURE); - this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e); - context.yield(); - } + final String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue(); + final byte[] messageContent = extractMessage(flowFile, session); + + try { + publisher.publish(messageContent, amqpProperties, routingKey, exchange); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey); + } catch (Exception e) { + session.transfer(session.penalize(flowFile), REL_FAILURE); + getLogger().error("Failed while sending message to AMQP via " + publisher, e); } } - /** - * - */ + @Override protected List getSupportedPropertyDescriptors() { return propertyDescriptors; } - /** - * - */ @Override public Set getRelationships() { return relationships; } - /** - * Will create an instance of {@link AMQPPublisher} - */ @Override - protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) { - return new AMQPPublisher(this.amqpConnection, this.getLogger()); + protected AMQPPublisher createAMQPWorker(final ProcessContext context, final Connection connection) { + return new AMQPPublisher(connection, getLogger()); } /** @@ -194,6 +192,20 @@ public class PublishAMQP extends AbstractAMQPProcessor { return messageContent; } + + private void updateBuilderFromAttribute(final FlowFile flowFile, final String attribute, final Consumer updater) { + final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX + attribute); + if (attributeValue == null) { + return; + } + + try { + updater.accept(attributeValue); + } catch (final Exception e) { + getLogger().warn("Failed to update AMQP Message Property " + attribute, e); + } + } + /** * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes * extracted from {@link FlowFile} are considered candidates for AMQP @@ -208,66 +220,45 @@ public class PublishAMQP extends AbstractAMQPProcessor { * {@link AMQPUtils#validateAMQPTimestampProperty} */ private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) { - Map attributes = flowFile.getAttributes(); - AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); - for (Entry attributeEntry : attributes.entrySet()) { - if (attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) { - String amqpPropName = attributeEntry.getKey(); - String amqpPropValue = attributeEntry.getValue(); + final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); - AMQPUtils.PropertyNames propertyNames = AMQPUtils.PropertyNames.fromValue(amqpPropName); + updateBuilderFromAttribute(flowFile, "contentType", builder::contentType); + updateBuilderFromAttribute(flowFile, "contentEncoding", builder::contentEncoding); + updateBuilderFromAttribute(flowFile, "deliveryMode", mode -> builder.deliveryMode(Integer.parseInt(mode))); + updateBuilderFromAttribute(flowFile, "priority", pri -> builder.priority(Integer.parseInt(pri))); + updateBuilderFromAttribute(flowFile, "correlationId", builder::correlationId); + updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo); + updateBuilderFromAttribute(flowFile, "expiration", builder::expiration); + updateBuilderFromAttribute(flowFile, "messageId", builder::messageId); + updateBuilderFromAttribute(flowFile, "timestamp", ts -> builder.timestamp(new Date(Long.parseLong(ts)))); + updateBuilderFromAttribute(flowFile, "type", builder::type); + updateBuilderFromAttribute(flowFile, "userId", builder::userId); + updateBuilderFromAttribute(flowFile, "appId", builder::appId); + updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId); + updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers))); - if (propertyNames != null) { - switch (propertyNames){ - case CONTENT_TYPE: - builder.contentType(amqpPropValue); - break; - case CONTENT_ENCODING: - builder.contentEncoding(amqpPropValue); - break; - case HEADERS: - builder.headers(AMQPUtils.validateAMQPHeaderProperty(amqpPropValue)); - break; - case DELIVERY_MODE: - builder.deliveryMode(AMQPUtils.validateAMQPDeliveryModeProperty(amqpPropValue)); - break; - case PRIORITY: - builder.priority(AMQPUtils.validateAMQPPriorityProperty(amqpPropValue)); - break; - case CORRELATION_ID: - builder.correlationId(amqpPropValue); - break; - case REPLY_TO: - builder.replyTo(amqpPropValue); - break; - case EXPIRATION: - builder.expiration(amqpPropValue); - break; - case MESSAGE_ID: - builder.messageId(amqpPropValue); - break; - case TIMESTAMP: - builder.timestamp(AMQPUtils.validateAMQPTimestampProperty(amqpPropValue)); - break; - case TYPE: - builder.type(amqpPropValue); - break; - case USER_ID: - builder.userId(amqpPropValue); - break; - case APP_ID: - builder.appId(amqpPropValue); - break; - case CLUSTER_ID: - builder.clusterId(amqpPropValue); - break; - } - - } else { - getLogger().warn("Unrecognised AMQP property '" + amqpPropName + "', will ignore."); - } - } - } return builder.build(); } + + /** + * Will validate if provided amqpPropValue can be converted to a {@link Map}. + * Should be passed in the format: amqp$headers=key=value,key=value etc. + * + * @param amqpPropValue the value of the property + * @return {@link Map} if valid otherwise null + */ + private Map validateAMQPHeaderProperty(String amqpPropValue) { + String[] strEntries = amqpPropValue.split(","); + Map headers = new HashMap<>(); + for (String strEntry : strEntries) { + String[] kv = strEntry.split("="); + if (kv.length == 2) { + headers.put(kv[0].trim(), kv[1].trim()); + } else { + getLogger().warn("Malformed key value pair for AMQP header property: " + amqpPropValue); + } + } + + return headers; + } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java index 33844c308c..51bd59fa84 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java @@ -72,7 +72,6 @@ public class AMQPPublisherTest { try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { sender.publish("hello".getBytes(), null, "key1", "myExchange"); - Thread.sleep(200); } assertNotNull(connection.createChannel().basicGet("queue1", true)); @@ -95,9 +94,8 @@ public class AMQPPublisherTest { try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""))) { sender.publish("hello".getBytes(), null, "key1", "myExchange"); - Thread.sleep(1000); } - Thread.sleep(200); + verify(retListener, atMost(1)).handleReturn(Mockito.anyInt(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(BasicProperties.class), (byte[]) Mockito.any()); connection.close(); diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java deleted file mode 100644 index 5452809900..0000000000 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.amqp.processors; - -import static org.junit.Assert.assertEquals; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.util.MockProcessSession; -import org.apache.nifi.util.SharedSessionState; -import org.junit.Test; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.AMQP.BasicProperties; - -public class AMQPUtilsTest { - - - @Test - public void validateUpdateFlowFileAttributesWithAmqpProperties() { - PublishAMQP processor = new PublishAMQP(); - ProcessSession processSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong()), - processor); - FlowFile sourceFlowFile = processSession.create(); - BasicProperties amqpProperties = new AMQP.BasicProperties.Builder() - .contentType("text/plain").deliveryMode(2) - .priority(1).userId("joe") - .build(); - FlowFile f2 = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, sourceFlowFile, - processSession); - - assertEquals("text/plain", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "contentType")); - assertEquals("joe", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "userId")); - assertEquals("2", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "deliveryMode")); - } -} diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java index 0657a6553a..bc4c32d830 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java @@ -29,6 +29,8 @@ import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; +import com.rabbitmq.client.Connection; + /** * Unit tests for the AbstractAMQPProcessor class @@ -77,12 +79,12 @@ public class AbstractAMQPProcessorTest { */ public static class MockAbstractAMQPProcessor extends AbstractAMQPProcessor { @Override - protected void rendezvousWithAmqp(ProcessContext context, ProcessSession session) throws ProcessException { + protected void processResource(Connection connection, AMQPConsumer consumer, ProcessContext context, ProcessSession session) throws ProcessException { // nothing to do } @Override - protected AMQPConsumer finishBuildingTargetResource(ProcessContext context) { + protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection connection) { return null; } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 52b48d87b4..66abb2d0e5 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -20,14 +20,12 @@ import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; import java.util.Arrays; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -41,12 +39,10 @@ public class ConsumeAMQPTest { @Test public void validateSuccessfullConsumeAndTransferToSuccess() throws Exception { - Map> routingMap = new HashMap<>(); - routingMap.put("key1", Arrays.asList("queue1", "queue2")); - Map exchangeToRoutingKeymap = new HashMap<>(); - exchangeToRoutingKeymap.put("myExchange", "key1"); + final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); - Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); @@ -57,7 +53,6 @@ public class ConsumeAMQPTest { runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); runner.run(); - Thread.sleep(200); final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); assertNotNull(successFF); } @@ -65,25 +60,20 @@ public class ConsumeAMQPTest { } public static class LocalConsumeAMQP extends ConsumeAMQP { + private final Connection connection; - private final Connection conection; public LocalConsumeAMQP(Connection connection) { - this.conection = connection; + this.connection = connection; } @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - synchronized (this) { - if (this.amqpConnection == null || !this.amqpConnection.isOpen()) { - this.amqpConnection = this.conection; - this.targetResource = this.finishBuildingTargetResource(context); - } - } - this.rendezvousWithAmqp(context, session); + protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection connection) { + return new AMQPConsumer(connection, context.getProperty(QUEUE).getValue()); } - public Connection getConnection() { - return this.amqpConnection; + @Override + protected Connection createConnection(ProcessContext context) { + return connection; } } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index fec3d5015f..cee44a178e 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -22,15 +22,13 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -44,13 +42,13 @@ public class PublishAMQPTest { @Test public void validateSuccessfullPublishAndTransferToSuccess() throws Exception { - PublishAMQP pubProc = new LocalPublishAMQP(false); - TestRunner runner = TestRunners.newTestRunner(pubProc); + final PublishAMQP pubProc = new LocalPublishAMQP(); + final TestRunner runner = TestRunners.newTestRunner(pubProc); runner.setProperty(PublishAMQP.HOST, "injvm"); runner.setProperty(PublishAMQP.EXCHANGE, "myExchange"); runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); - Map attributes = new HashMap<>(); + final Map attributes = new HashMap<>(); attributes.put("foo", "bar"); attributes.put("amqp$contentType", "foo/bar"); attributes.put("amqp$contentEncoding", "foobar123"); @@ -70,20 +68,21 @@ public class PublishAMQPTest { runner.enqueue("Hello Joe".getBytes(), attributes); runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); assertNotNull(successFF); - Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel(); - GetResponse msg1 = channel.basicGet("queue1", true); + + final Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel(); + final GetResponse msg1 = channel.basicGet("queue1", true); assertNotNull(msg1); assertEquals("foo/bar", msg1.getProps().getContentType()); - assertEquals("foobar123", msg1.getProps().getContentEncoding()); - Map headerMap = msg1.getProps().getHeaders(); + final Map headerMap = msg1.getProps().getHeaders(); - Object foo = headerMap.get("foo"); - Object foo2 = headerMap.get("foo2"); - Object foo3 = headerMap.get("foo3"); + final Object foo = headerMap.get("foo"); + final Object foo2 = headerMap.get("foo2"); + final Object foo3 = headerMap.get("foo3"); assertEquals("bar", foo.toString()); assertEquals("bar2", foo2.toString()); @@ -115,53 +114,29 @@ public class PublishAMQPTest { runner.enqueue("Hello Joe".getBytes()); runner.run(); - Thread.sleep(200); assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty()); assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0)); } - public static class LocalPublishAMQP extends PublishAMQP { - private final boolean closeConnection; + public static class LocalPublishAMQP extends PublishAMQP { + private TestConnection connection; public LocalPublishAMQP() { - this(true); - } + final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); - public LocalPublishAMQP(boolean closeConection) { - this.closeConnection = closeConection; + connection = new TestConnection(exchangeToRoutingKeymap, routingMap); } @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - synchronized (this) { - if (this.amqpConnection == null || !this.amqpConnection.isOpen()) { - Map> routingMap = new HashMap<>(); - routingMap.put("key1", Arrays.asList("queue1", "queue2")); - Map exchangeToRoutingKeymap = new HashMap<>(); - exchangeToRoutingKeymap.put("myExchange", "key1"); - this.amqpConnection = new TestConnection(exchangeToRoutingKeymap, routingMap); - this.targetResource = this.finishBuildingTargetResource(context); - } - } - this.rendezvousWithAmqp(context, session); + protected Connection createConnection(ProcessContext context) { + return connection; } public Connection getConnection() { - this.close(); - return this.amqpConnection; - } - - // since we really don't have any real connection (rather emulated one), the override is - // needed here so the call to close from TestRunner does nothing since we are - // grabbing the emulated connection later to do the assertions in some tests. - @Override - @OnStopped - public void close() { - if (this.closeConnection) { - super.close(); - } + return connection; } } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java index f793084525..4ed4eeb2b9 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; @@ -37,14 +38,19 @@ import com.rabbitmq.client.AMQP.Queue.PurgeOk; import com.rabbitmq.client.AMQP.Tx.CommitOk; import com.rabbitmq.client.AMQP.Tx.RollbackOk; import com.rabbitmq.client.AMQP.Tx.SelectOk; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Command; +import com.rabbitmq.client.ConfirmCallback; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; -import com.rabbitmq.client.FlowListener; +import com.rabbitmq.client.ConsumerShutdownSignalCallback; +import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.Method; +import com.rabbitmq.client.ReturnCallback; import com.rabbitmq.client.ReturnListener; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; @@ -55,19 +61,12 @@ import com.rabbitmq.client.ShutdownSignalException; class TestChannel implements Channel { private final ExecutorService executorService; - private final Map> enqueuedMessages; - private final Map> routingKeyToQueueMappings; - private final Map exchangeToRoutingKeyMappings; - private final List returnListeners; - private boolean open; - private boolean corrupted; - private Connection connection; public TestChannel(Map exchangeToRoutingKeyMappings, @@ -97,28 +96,24 @@ class TestChannel implements Channel { @Override public void addShutdownListener(ShutdownListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void removeShutdownListener(ShutdownListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public ShutdownSignalException getCloseReason() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void notifyListeners() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @@ -129,8 +124,7 @@ class TestChannel implements Channel { @Override public int getChannelNumber() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override @@ -145,28 +139,19 @@ class TestChannel implements Channel { @Override public void close(int closeCode, String closeMessage) throws IOException, TimeoutException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } - @Override - public boolean flowBlocked() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); - } - @Override public void abort() throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void abort(int closeCode, String closeMessage) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @@ -177,88 +162,58 @@ class TestChannel implements Channel { @Override public boolean removeReturnListener(ReturnListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void clearReturnListeners() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); - - } - - @Override - public void addFlowListener(FlowListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); - - } - - @Override - public boolean removeFlowListener(FlowListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); - } - - @Override - public void clearFlowListeners() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void addConfirmListener(ConfirmListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public boolean removeConfirmListener(ConfirmListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void clearConfirmListeners() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public Consumer getDefaultConsumer() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void setDefaultConsumer(Consumer consumer) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void basicQos(int prefetchCount, boolean global) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void basicQos(int prefetchCount) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @@ -329,198 +284,169 @@ class TestChannel implements Channel { @Override public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public DeclareOk exchangeDeclare(String exchange, String type) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public DeclareOk exchangeDeclarePassive(String name) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public DeleteOk exchangeDelete(String exchange) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public BindOk exchangeBind(String destination, String source, String routingKey) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public BindOk exchangeBind(String destination, String source, String routingKey, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void exchangeBindNoWait(String destination, String source, String routingKey, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void exchangeUnbindNoWait(String destination, String source, String routingKey, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void queueBindNoWait(String queue, String exchange, String routingKey, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map arguments) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public PurgeOk queuePurge(String queue) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override @@ -535,156 +461,254 @@ class TestChannel implements Channel { @Override public void basicAck(long deliveryTag, boolean multiple) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void basicReject(long deliveryTag, boolean requeue) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public String basicConsume(String queue, Consumer callback) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public String basicConsume(String queue, boolean autoAck, Map arguments, Consumer callback) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, Consumer callback) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void basicCancel(String consumerTag) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public RecoverOk basicRecover() throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public RecoverOk basicRecover(boolean requeue) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public SelectOk txSelect() throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public CommitOk txCommit() throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public RollbackOk txRollback() throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public com.rabbitmq.client.AMQP.Confirm.SelectOk confirmSelect() throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public long getNextPublishSeqNo() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public boolean waitForConfirms() throws InterruptedException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void waitForConfirmsOrDie() throws IOException, InterruptedException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void asyncRpc(Method method) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public Command rpc(Method method) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public long messageCount(String queue) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public long consumerCount(String queue) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public ReturnListener addReturnListener(ReturnCallback returnCallback) { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback) { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map arguments) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) + throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) + throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) + throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, + CancelCallback cancelCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, + CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public CompletableFuture asyncCompletableRpc(Method method) throws IOException { + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java index cb29478155..d47ac923bb 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java @@ -21,12 +21,14 @@ import java.net.InetAddress; import java.util.List; import java.util.Map; +import com.rabbitmq.client.BlockedCallback; import com.rabbitmq.client.BlockedListener; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ExceptionHandler; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.UnblockedCallback; /** * Implementation of {@link Connection} to be used for testing. Will return the @@ -42,11 +44,10 @@ import com.rabbitmq.client.ShutdownSignalException; class TestConnection implements Connection { private final TestChannel channel; - private boolean open; + private String id; - public TestConnection(Map exchangeToRoutingKeyMappings, - Map> routingKeyToQueueMappings) { + public TestConnection(Map exchangeToRoutingKeyMappings, Map> routingKeyToQueueMappings) { this.channel = new TestChannel(exchangeToRoutingKeyMappings, routingKeyToQueueMappings); this.channel.setConnection(this); this.open = true; @@ -54,26 +55,22 @@ class TestConnection implements Connection { @Override public void addShutdownListener(ShutdownListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void removeShutdownListener(ShutdownListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public ShutdownSignalException getCloseReason() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void notifyListeners() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override @@ -92,38 +89,32 @@ class TestConnection implements Connection { @Override public int getPort() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public int getChannelMax() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public int getFrameMax() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public int getHeartbeat() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public Map getClientProperties() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public Map getServerProperties() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override @@ -133,8 +124,7 @@ class TestConnection implements Connection { @Override public Channel createChannel(int channelNumber) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override @@ -149,67 +139,76 @@ class TestConnection implements Connection { @Override public void close(int closeCode, String closeMessage) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void close(int timeout) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void close(int closeCode, String closeMessage, int timeout) throws IOException { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void abort() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void abort(int closeCode, String closeMessage) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void abort(int timeout) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void abort(int closeCode, String closeMessage, int timeout) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void addBlockedListener(BlockedListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public boolean removeBlockedListener(BlockedListener listener) { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public void clearBlockedListeners() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); } @Override public ExceptionHandler getExceptionHandler() { - throw new UnsupportedOperationException( - "This method is not currently supported as it is not used by current API in testing"); + throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String getClientProvidedName() { + return "unit-test"; + } + + @Override + public BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback) { + return null; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; } }