NIFI-4902: This closes #2485. Updated ConsumeAMQP, PublishAMQP to use one connection per concurrent task instead of a single connection shared by all concurrent tasks. This offers far better throughput when the network latency is non-trivial. Also refactored to simplify code

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2018-02-21 09:31:36 -05:00 committed by joewitt
parent 5bdb7cf6e7
commit 39556e3513
16 changed files with 633 additions and 888 deletions

View File

@ -20,7 +20,7 @@ language governing permissions and limitations under the License. -->
<packaging>jar</packaging>
<properties>
<amqp-client.version>3.6.0</amqp-client.version>
<amqp-client.version>5.2.0</amqp-client.version>
</properties>
<dependencies>

View File

@ -33,14 +33,8 @@ import com.rabbitmq.client.GetResponse;
final class AMQPConsumer extends AMQPWorker {
private final static Logger logger = LoggerFactory.getLogger(AMQPConsumer.class);
private final String queueName;
/**
* Creates an instance of this consumer
* @param connection instance of AMQP {@link Connection}
* @param queueName name of the queue from which messages will be consumed.
*/
AMQPConsumer(Connection connection, String queueName) {
super(connection);
this.validateStringProperty("queueName", queueName);
@ -60,7 +54,7 @@ final class AMQPConsumer extends AMQPWorker {
*/
public GetResponse consume() {
try {
return this.channel.basicGet(this.queueName, true);
return getChannel().basicGet(this.queueName, true);
} catch (IOException e) {
logger.error("Failed to receive message from AMQP; " + this + ". Possible reasons: Queue '" + this.queueName
+ "' may not have been defined", e);
@ -68,9 +62,6 @@ final class AMQPConsumer extends AMQPWorker {
}
}
/**
*
*/
@Override
public String toString() {
return super.toString() + ", QUEUE:" + this.queueName;

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.nifi.logging.ComponentLog;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
@ -31,19 +32,17 @@ import com.rabbitmq.client.ReturnListener;
final class AMQPPublisher extends AMQPWorker {
private final ComponentLog processLog;
private final String connectionString;
/**
* Creates an instance of this publisher
*
* @param connection
* instance of AMQP {@link Connection}
* @param connection instance of AMQP {@link Connection}
*/
AMQPPublisher(Connection connection, ComponentLog processLog) {
super(connection);
this.processLog = processLog;
this.channel.addReturnListener(new UndeliverableMessageLogger());
getChannel().addReturnListener(new UndeliverableMessageLogger());
this.connectionString = connection.toString();
}
@ -51,15 +50,11 @@ final class AMQPPublisher extends AMQPWorker {
* Publishes message with provided AMQP properties (see
* {@link BasicProperties}) to a pre-defined AMQP Exchange.
*
* @param bytes
* bytes representing a message.
* @param properties
* instance of {@link BasicProperties}
* @param exchange
* the name of AMQP exchange to which messages will be published.
* @param bytes bytes representing a message.
* @param properties instance of {@link BasicProperties}
* @param exchange the name of AMQP exchange to which messages will be published.
* If not provided 'default' exchange will be used.
* @param routingKey
* (required) the name of the routingKey to be used by AMQP-based
* @param routingKey (required) the name of the routingKey to be used by AMQP-based
* system to route messages to its final destination (queue).
*/
void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) {
@ -71,22 +66,18 @@ final class AMQPPublisher extends AMQPWorker {
processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
+ "' exchange with '" + routingKey + "' as a routing key.");
if (this.channel.isOpen()) {
final Channel channel = getChannel();
if (channel.isOpen()) {
try {
this.channel.basicPublish(exchange, routingKey, true, properties, bytes);
channel.basicPublish(exchange, routingKey, true, properties, bytes);
} catch (Exception e) {
throw new IllegalStateException("Failed to publish to '" +
exchange + "' with '" + routingKey + "'.", e);
throw new IllegalStateException("Failed to publish to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
}
} else {
throw new IllegalStateException("This instance of AMQPPublisher is invalid since "
+ "its publishingChannel is closed");
throw new IllegalStateException("This instance of AMQPPublisher is invalid since its publishingChannel is closed");
}
}
/**
*
*/
@Override
public String toString() {
return this.connectionString;
@ -106,8 +97,7 @@ final class AMQPPublisher extends AMQPWorker {
*/
private final class UndeliverableMessageLogger implements ReturnListener {
@Override
public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message)
throws IOException {
public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) throws IOException {
String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey
+ "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + ".";
processLog.warn(logMessage);

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Connection;
public class AMQPResource<T extends AMQPWorker> implements Closeable {
private final Connection connection;
private final T worker;
public AMQPResource(final Connection connection, final T worker) {
this.connection = connection;
this.worker = worker;
}
public Connection getConnection() {
return connection;
}
public T getWorker() {
return worker;
}
@Override
public void close() throws IOException {
IOException ioe = null;
try {
worker.close();
} catch (final IOException e) {
ioe = e;
} catch (final TimeoutException e) {
ioe = new IOException(e);
}
try {
connection.close();
} catch (final IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
if (ioe != null) {
throw ioe;
}
}
}

View File

@ -1,240 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* Utility helper class simplify interactions with target AMQP API and NIFI API.
*/
abstract class AMQPUtils {
public final static String AMQP_PROP_DELIMITER = "$";
public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER;
private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class);
public enum PropertyNames {
CONTENT_TYPE(AMQP_PROP_PREFIX + "contentType"),
CONTENT_ENCODING(AMQP_PROP_PREFIX + "contentEncoding"),
HEADERS(AMQP_PROP_PREFIX + "headers"),
DELIVERY_MODE(AMQP_PROP_PREFIX + "deliveryMode"),
PRIORITY(AMQP_PROP_PREFIX + "priority"),
CORRELATION_ID(AMQP_PROP_PREFIX + "correlationId"),
REPLY_TO(AMQP_PROP_PREFIX + "replyTo"),
EXPIRATION(AMQP_PROP_PREFIX + "expiration"),
MESSAGE_ID(AMQP_PROP_PREFIX + "messageId"),
TIMESTAMP(AMQP_PROP_PREFIX + "timestamp"),
TYPE(AMQP_PROP_PREFIX + "type"),
USER_ID(AMQP_PROP_PREFIX + "userId"),
APP_ID(AMQP_PROP_PREFIX + "appId"),
CLUSTER_ID(AMQP_PROP_PREFIX + "clusterId");
PropertyNames(String value) {
this.value = value;
}
private final String value;
private static final Map<String, PropertyNames> lookup = new HashMap<>();
public static PropertyNames fromValue(String s) {
return lookup.get(s);
}
static {
for (PropertyNames propertyNames : PropertyNames.values()) {
lookup.put(propertyNames.getValue(), propertyNames);
}
}
public String getValue() {
return value;
}
@Override
public String toString() {
return value;
}
}
/**
* Updates {@link FlowFile} with attributes representing AMQP properties
*
* @param amqpProperties instance of {@link BasicProperties}
* @param flowFile instance of target {@link FlowFile}
* @param processSession instance of {@link ProcessSession}
*/
public static FlowFile updateFlowFileAttributesWithAmqpProperties(BasicProperties amqpProperties, FlowFile flowFile, ProcessSession processSession) {
if (amqpProperties != null) {
try {
Method[] methods = BasicProperties.class.getDeclaredMethods();
Map<String, String> attributes = new HashMap<>();
for (Method method : methods) {
if (Modifier.isPublic(method.getModifiers()) && method.getName().startsWith("get")) {
Object amqpPropertyValue = method.invoke(amqpProperties);
if (amqpPropertyValue != null) {
String propertyName = extractPropertyNameFromMethod(method);
if (isValidAmqpPropertyName(propertyName)) {
if (propertyName.equals(PropertyNames.CONTENT_TYPE.getValue())) {
attributes.put(CoreAttributes.MIME_TYPE.key(), amqpPropertyValue.toString());
}
attributes.put(propertyName, amqpPropertyValue.toString());
}
}
}
}
flowFile = processSession.putAllAttributes(flowFile, attributes);
} catch (Exception e) {
logger.warn("Failed to update FlowFile with AMQP attributes", e);
}
}
return flowFile;
}
/**
* Will validate if provided name corresponds to valid AMQP property.
*
* @param name the name of the property
* @return 'true' if valid otherwise 'false'
*/
public static boolean isValidAmqpPropertyName(String name) {
return PropertyNames.fromValue(name) != null;
}
/**
*
*/
private static String extractPropertyNameFromMethod(Method method) {
char c[] = method.getName().substring(3).toCharArray();
c[0] = Character.toLowerCase(c[0]);
return AMQP_PROP_PREFIX + new String(c);
}
/**
* Will validate if provided amqpPropValue can be converted to a {@link Map}.
* Should be passed in the format: amqp$headers=key=value,key=value etc.
*
* @param amqpPropValue the value of the property
* @return {@link Map} if valid otherwise null
*/
public static Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue) {
String[] strEntries = amqpPropValue.split(",");
Map<String, Object> headers = new HashMap<>();
for (String strEntry : strEntries) {
String[] kv = strEntry.split("=");
if (kv.length == 2) {
headers.put(kv[0].trim(), kv[1].trim());
} else {
logger.warn("Malformed key value pair for AMQP header property: " + amqpPropValue);
}
}
return headers;
}
/**
* Will validate if provided amqpPropValue can be converted to an {@link Integer}, and that its
* value is 1 or 2.
*
* @param amqpPropValue the value of the property
* @return {@link Integer} if valid otherwise null
*/
public static Integer validateAMQPDeliveryModeProperty(String amqpPropValue) {
Integer deliveryMode = toInt(amqpPropValue);
if (deliveryMode == null || !(deliveryMode == 1 || deliveryMode == 2)) {
logger.warn("Invalid value for AMQP deliveryMode property: " + amqpPropValue);
}
return deliveryMode;
}
/**
* Will validate if provided amqpPropValue can be converted to an {@link Integer} and that its
* value is between 0 and 9 (inclusive).
*
* @param amqpPropValue the value of the property
* @return {@link Integer} if valid otherwise null
*/
public static Integer validateAMQPPriorityProperty(String amqpPropValue) {
Integer priority = toInt(amqpPropValue);
if (priority == null || !(priority >= 0 && priority <= 9)) {
logger.warn("Invalid value for AMQP priority property: " + amqpPropValue);
}
return priority;
}
/**
* Will validate if provided amqpPropValue can be converted to a {@link Date}.
*
* @param amqpPropValue the value of the property
* @return {@link Date} if valid otherwise null
*/
public static Date validateAMQPTimestampProperty(String amqpPropValue) {
Long timestamp = toLong(amqpPropValue);
if (timestamp == null) {
logger.warn("Invalid value for AMQP timestamp property: " + amqpPropValue);
return null;
}
//milliseconds are lost when sending to AMQP
return new Date(timestamp);
}
/**
* Takes a {@link String} and tries to convert to an {@link Integer}.
*
* @param strVal the value to be converted
* @return {@link Integer} if valid otherwise null
*/
private static Integer toInt(String strVal) {
try {
return Integer.parseInt(strVal);
} catch (NumberFormatException aE) {
return null;
}
}
/**
* Takes a {@link String} and tries to convert to a {@link Long}.
*
* @param strVal the value to be converted
* @return {@link Long} if valid otherwise null
*/
private static Long toLong(String strVal) {
try {
return Long.parseLong(strVal);
} catch (NumberFormatException aE) {
return null;
}
}
}

View File

@ -34,19 +34,18 @@ import com.rabbitmq.client.Connection;
abstract class AMQPWorker implements AutoCloseable {
private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
protected final Channel channel;
private final Channel channel;
/**
* Creates an instance of this worker initializing it with AMQP
* {@link Connection} and creating a target {@link Channel} used by
* sub-classes to interact with AMQP-based messaging system.
*
* @param connection
* instance of {@link Connection}
* @param connection instance of {@link Connection}
*/
public AMQPWorker(Connection connection) {
this.validateConnection(connection);
public AMQPWorker(final Connection connection) {
validateConnection(connection);
try {
this.channel = connection.createChannel();
} catch (IOException e) {
@ -55,6 +54,10 @@ abstract class AMQPWorker implements AutoCloseable {
}
}
protected Channel getChannel() {
return channel;
}
/**
* Closes {@link Channel} created when instance of this class was created.
*/
@ -91,8 +94,7 @@ abstract class AMQPWorker implements AutoCloseable {
/**
* Validates that {@link Connection} is not null and open.
*
* @param connection
* instance of {@link Connection}
* @param connection instance of {@link Connection}
*/
private void validateConnection(Connection connection) {
if (connection == null) {

View File

@ -16,9 +16,11 @@
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
@ -29,7 +31,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.SslContextFactory;
@ -119,112 +120,95 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
.defaultValue("REQUIRED")
.build();
static List<PropertyDescriptor> descriptors = new ArrayList<>();
private static final List<PropertyDescriptor> propertyDescriptors;
/*
* Will ensure that list of PropertyDescriptors is build only once, since
* all other lifecycle methods are invoked multiple times.
*/
static {
descriptors.add(HOST);
descriptors.add(PORT);
descriptors.add(V_HOST);
descriptors.add(USER);
descriptors.add(PASSWORD);
descriptors.add(AMQP_VERSION);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(USE_CERT_AUTHENTICATION);
descriptors.add(CLIENT_AUTH);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HOST);
properties.add(PORT);
properties.add(V_HOST);
properties.add(USER);
properties.add(PASSWORD);
properties.add(AMQP_VERSION);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(USE_CERT_AUTHENTICATION);
properties.add(CLIENT_AUTH);
propertyDescriptors = Collections.unmodifiableList(properties);
}
protected volatile Connection amqpConnection;
protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
return propertyDescriptors;
}
protected volatile T targetResource;
private final BlockingQueue<AMQPResource<T>> resourceQueue = new LinkedBlockingQueue<>();
/**
* Will builds target resource ({@link AMQPPublisher} or
* {@link AMQPConsumer}) upon first invocation and will delegate to the
* implementation of
* {@link #rendezvousWithAmqp(ProcessContext, ProcessSession)} method for
* further processing.
* Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the
* implementation of {@link #processResource(ProcessContext, ProcessSession)} method for further processing.
*/
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
synchronized (this) {
this.buildTargetResource(context);
public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
AMQPResource<T> resource = resourceQueue.poll();
if (resource == null) {
resource = createResource(context);
}
try {
processResource(resource.getConnection(), resource.getWorker(), context, session);
resourceQueue.offer(resource);
} catch (final Exception e) {
try {
resource.close();
} catch (final Exception e2) {
e.addSuppressed(e2);
}
throw e;
}
this.rendezvousWithAmqp(context, session);
}
/**
* Will close current AMQP connection.
*/
@OnStopped
public void close() {
try {
if (this.targetResource != null) {
this.targetResource.close();
AMQPResource<T> resource;
while ((resource = resourceQueue.poll()) != null) {
try {
resource.close();
} catch (final Exception e) {
getLogger().warn("Failed to close AMQP Connection", e);
}
} catch (Exception e) {
this.getLogger().warn("Failure while closing target resource " + this.targetResource, e);
}
try {
if (this.amqpConnection != null) {
this.amqpConnection.close();
}
} catch (IOException e) {
this.getLogger().warn("Failure while closing connection", e);
}
this.amqpConnection = null;
}
/**
* Delegate method to supplement
* {@link #onTrigger(ProcessContext, ProcessSession)}. It is implemented by
* sub-classes to perform {@link Processor} specific functionality.
*
* @param context
* instance of {@link ProcessContext}
* @param session
* instance of {@link ProcessSession}
* Performs functionality of the Processor, given the appropriate connection and worker
*/
protected abstract void rendezvousWithAmqp(ProcessContext context, ProcessSession session) throws ProcessException;
protected abstract void processResource(Connection connection, T worker, ProcessContext context, ProcessSession session) throws ProcessException;
/**
* Delegate method to supplement building of target {@link AMQPWorker} (see
* {@link AMQPPublisher} or {@link AMQPConsumer}) and is implemented by
* sub-classes.
* Builds the appropriate AMQP Worker for the implementing processor
*
* @param context
* instance of {@link ProcessContext}
* @param context instance of {@link ProcessContext}
* @return new instance of {@link AMQPWorker}
*/
protected abstract T finishBuildingTargetResource(ProcessContext context);
protected abstract T createAMQPWorker(ProcessContext context, Connection connection);
/**
* Builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}).
* It does so by making a {@link Connection} and then delegating to the
* implementation of {@link #finishBuildingTargetResource(ProcessContext)}
* which will build {@link AMQPWorker} (see {@link AMQPPublisher} or
* {@link AMQPConsumer}).
*/
private void buildTargetResource(ProcessContext context) {
if (this.amqpConnection == null || !this.amqpConnection.isOpen()) {
this.amqpConnection = this.createConnection(context);
this.targetResource = this.finishBuildingTargetResource(context);
}
private AMQPResource<T> createResource(final ProcessContext context) {
final Connection connection = createConnection(context);
final T worker = createAMQPWorker(context, connection);
return new AMQPResource<>(connection, worker);
}
/**
* Creates {@link Connection} to AMQP system.
*/
private Connection createConnection(ProcessContext context) {
ConnectionFactory cf = new ConnectionFactory();
protected Connection createConnection(ProcessContext context) {
final ConnectionFactory cf = new ConnectionFactory();
cf.setHost(context.getProperty(HOST).getValue());
cf.setPort(Integer.parseInt(context.getProperty(PORT).getValue()));
cf.setUsername(context.getProperty(USER).getValue());
cf.setPassword(context.getProperty(PASSWORD).getValue());
String vHost = context.getProperty(V_HOST).getValue();
final String vHost = context.getProperty(V_HOST).getValue();
if (vHost != null) {
cf.setVirtualHost(vHost);
}

View File

@ -16,16 +16,18 @@
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
@ -33,24 +35,34 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
/**
* Consuming AMQP processor which upon each invocation of
* {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a
* {@link FlowFile} containing the body of the consumed AMQP message and AMQP
* properties that came with message which are added to a {@link FlowFile} as
* attributes.
*/
@Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" })
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Consumes AMQP Message transforming its content to a FlowFile and transitioning it to 'success' relationship")
@CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be "
+ "emitted as its own FlowFile to the 'success' relationship.")
@WritesAttributes({
@WritesAttribute(attribute = "amqp$appId", description = "The App ID field from the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentType", description = "The Content Type reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message"),
@WritesAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"),
@WritesAttribute(attribute = "amqp$priority", description = "The Message priority"),
@WritesAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"),
@WritesAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"),
@WritesAttribute(attribute = "amqp$expiration", description = "The Message Expiration"),
@WritesAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"),
@WritesAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"),
@WritesAttribute(attribute = "amqp$type", description = "The type of message"),
@WritesAttribute(attribute = "amqp$userId", description = "The ID of the user"),
@WritesAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"),
})
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
private static final String ATTRIBUTES_PREFIX = "amqp$";
public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("Queue")
@ -64,73 +76,82 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.description("All FlowFiles that are received from the AMQP queue are routed to this relationship")
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private static final List<PropertyDescriptor> propertyDescriptors;
private static final Set<Relationship> relationships;
private final static Set<Relationship> relationships;
/*
* Will ensure that the list of property descriptors is build only once.
* Will also create a Set of relationships
*/
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(QUEUE);
_propertyDescriptors.addAll(descriptors);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QUEUE);
properties.addAll(getCommonPropertyDescriptors());
propertyDescriptors = Collections.unmodifiableList(properties);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(_relationships);
Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(rels);
}
/**
* Will construct a {@link FlowFile} containing the body of the consumed
* AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
* not null) and AMQP properties that came with message which are added to a
* {@link FlowFile} as attributes, transferring {@link FlowFile} to
* Will construct a {@link FlowFile} containing the body of the consumed AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
* not null) and AMQP properties that came with message which are added to a {@link FlowFile} as attributes, transferring {@link FlowFile} to
* 'success' {@link Relationship}.
*/
@Override
protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
final GetResponse response = this.targetResource.consume();
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(response.getBody());
}
});
BasicProperties amqpProperties = response.getProps();
flowFile = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile,
this.amqpConnection.toString() + "/" + context.getProperty(QUEUE).getValue());
processSession.transfer(flowFile, REL_SUCCESS);
} else {
protected void processResource(final Connection connection, final AMQPConsumer consumer, final ProcessContext context, final ProcessSession session) {
final GetResponse response = consumer.consume();
if (response == null) {
context.yield();
return;
}
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, out -> out.write(response.getBody()));
final BasicProperties amqpProperties = response.getProps();
final Map<String, String> attributes = buildAttributes(amqpProperties);
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
session.transfer(flowFile, REL_SUCCESS);
}
private Map<String, String> buildAttributes(final BasicProperties properties) {
final Map<String, String> attributes = new HashMap<>();
addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", properties.getHeaders());
addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode());
addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority());
addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "replyTo", properties.getReplyTo());
addAttribute(attributes, ATTRIBUTES_PREFIX + "expiration", properties.getExpiration());
addAttribute(attributes, ATTRIBUTES_PREFIX + "messageId", properties.getMessageId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "timestamp", properties.getTimestamp() == null ? null : properties.getTimestamp().getTime());
addAttribute(attributes, ATTRIBUTES_PREFIX + "type", properties.getType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", properties.getUserId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", properties.getClusterId());
return attributes;
}
private void addAttribute(final Map<String, String> attributes, final String attributeName, final Object value) {
if (value == null) {
return;
}
attributes.put(attributeName, value.toString());
}
/**
* Will create an instance of {@link AMQPConsumer}
*/
@Override
protected AMQPConsumer finishBuildingTargetResource(ProcessContext context) {
String queueName = context.getProperty(QUEUE).getValue();
return new AMQPConsumer(this.amqpConnection, queueName);
protected AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) {
final String queueName = context.getProperty(QUEUE).getValue();
return new AMQPConsumer(connection, queueName);
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
/**
*
*/
@Override
public Set<Relationship> getRelationships() {
return relationships;

View File

@ -20,16 +20,20 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
@ -45,27 +49,34 @@ import org.apache.nifi.stream.io.StreamUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
/**
* Publishing AMQP processor which upon each invocation of
* {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
* AMQP message sending it to an exchange identified during construction of this
* class while transferring the incoming {@link FlowFile} to 'success'
* {@link Relationship}.
*
* Expects that queues, exchanges and bindings are pre-defined by an AMQP
* administrator
*/
@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Exchange."
+ "In a typical AMQP exchange model, the message that is sent to the AMQP Exchange will be routed based on the 'Routing Key' "
+ "to its final destination in the queue (the binding). If due to some misconfiguration the binding between the Exchange, Routing Key "
+ "and Queue is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue). If "
+ "that happens you will see a log in both app-log and bulletin stating to that effect. Fixing the binding "
+ "(normally done by AMQP administrator) will resolve the issue.")
@CapabilityDescription("Creates an AMQP Message from the contents of a FlowFile and sends the message to an AMQP Exchange. "
+ "In a typical AMQP exchange model, the message that is sent to the AMQP Exchange will be routed based on the 'Routing Key' "
+ "to its final destination in the queue (the binding). If due to some misconfiguration the binding between the Exchange, Routing Key "
+ "and Queue is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue). If "
+ "that happens you will see a log in both app-log and bulletin stating to that effect, and the FlowFile will be routed to the 'failure' relationship.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@ReadsAttributes({
@ReadsAttribute(attribute = "amqp$appId", description = "The App ID field to set on the AMQP Message"),
@ReadsAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding to set on the AMQP Message"),
@ReadsAttribute(attribute = "amqp$contentType", description = "The Content Type to set on the AMQP Message"),
@ReadsAttribute(attribute = "amqp$headers", description = "The headers to set on the AMQP Message"),
@ReadsAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"),
@ReadsAttribute(attribute = "amqp$priority", description = "The Message priority"),
@ReadsAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"),
@ReadsAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"),
@ReadsAttribute(attribute = "amqp$expiration", description = "The Message Expiration"),
@ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"),
@ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"),
@ReadsAttribute(attribute = "amqp$type", description = "The type of message"),
@ReadsAttribute(attribute = "amqp$userId", description = "The ID of the user"),
@ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"),
})
public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
private static final String ATTRIBUTES_PREFIX = "amqp$";
public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
.name("Exchange Name")
@ -100,84 +111,71 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
private final static Set<Relationship> relationships;
/*
* Will ensure that the list of property descriptors is build only once.
* Will also create a Set of relationships
*/
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(EXCHANGE);
_propertyDescriptors.add(ROUTING_KEY);
_propertyDescriptors.addAll(descriptors);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(EXCHANGE);
properties.add(ROUTING_KEY);
properties.addAll(getCommonPropertyDescriptors());
propertyDescriptors = Collections.unmodifiableList(properties);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
/**
* Will construct AMQP message by extracting its body from the incoming
* {@link FlowFile}. AMQP Properties will be extracted from the
* {@link FlowFile} and converted to {@link BasicProperties} to be sent
* along with the message. Upon success the incoming {@link FlowFile} is
* transferred to 'success' {@link Relationship} and upon failure FlowFile is
* penalized and transferred to the 'failure' {@link Relationship}
* Will construct AMQP message by extracting its body from the incoming {@link FlowFile}. AMQP Properties will be extracted from the
* {@link FlowFile} and converted to {@link BasicProperties} to be sent along with the message. Upon success the incoming {@link FlowFile} is
* transferred to 'success' {@link Relationship} and upon failure FlowFile is penalized and transferred to the 'failure' {@link Relationship}
* <br>
*
* NOTE: Attributes extracted from {@link FlowFile} are considered
* candidates for AMQP properties if their names are prefixed with
* {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
*
*/
@Override
protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
FlowFile flowFile = processSession.get();
if (flowFile != null) {
BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile);
String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null){
throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
+ context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
}
String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
protected void processResource(final Connection connection, final AMQPPublisher publisher, ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
byte[] messageContent = this.extractMessage(flowFile, processSession);
final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile);
final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null) {
throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
+ context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
}
try {
this.targetResource.publish(messageContent, amqpProperties, routingKey, exchange);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, this.amqpConnection.toString() + "/E:" + exchange + "/RK:" + routingKey);
} catch (Exception e) {
processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e);
context.yield();
}
final String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
final byte[] messageContent = extractMessage(flowFile, session);
try {
publisher.publish(messageContent, amqpProperties, routingKey, exchange);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey);
} catch (Exception e) {
session.transfer(session.penalize(flowFile), REL_FAILURE);
getLogger().error("Failed while sending message to AMQP via " + publisher, e);
}
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
/**
*
*/
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
/**
* Will create an instance of {@link AMQPPublisher}
*/
@Override
protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) {
return new AMQPPublisher(this.amqpConnection, this.getLogger());
protected AMQPPublisher createAMQPWorker(final ProcessContext context, final Connection connection) {
return new AMQPPublisher(connection, getLogger());
}
/**
@ -194,6 +192,20 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
return messageContent;
}
private void updateBuilderFromAttribute(final FlowFile flowFile, final String attribute, final Consumer<String> updater) {
final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX + attribute);
if (attributeValue == null) {
return;
}
try {
updater.accept(attributeValue);
} catch (final Exception e) {
getLogger().warn("Failed to update AMQP Message Property " + attribute, e);
}
}
/**
* Extracts AMQP properties from the {@link FlowFile} attributes. Attributes
* extracted from {@link FlowFile} are considered candidates for AMQP
@ -208,66 +220,45 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
* {@link AMQPUtils#validateAMQPTimestampProperty}
*/
private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
Map<String, String> attributes = flowFile.getAttributes();
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
for (Entry<String, String> attributeEntry : attributes.entrySet()) {
if (attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
String amqpPropName = attributeEntry.getKey();
String amqpPropValue = attributeEntry.getValue();
final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQPUtils.PropertyNames propertyNames = AMQPUtils.PropertyNames.fromValue(amqpPropName);
updateBuilderFromAttribute(flowFile, "contentType", builder::contentType);
updateBuilderFromAttribute(flowFile, "contentEncoding", builder::contentEncoding);
updateBuilderFromAttribute(flowFile, "deliveryMode", mode -> builder.deliveryMode(Integer.parseInt(mode)));
updateBuilderFromAttribute(flowFile, "priority", pri -> builder.priority(Integer.parseInt(pri)));
updateBuilderFromAttribute(flowFile, "correlationId", builder::correlationId);
updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo);
updateBuilderFromAttribute(flowFile, "expiration", builder::expiration);
updateBuilderFromAttribute(flowFile, "messageId", builder::messageId);
updateBuilderFromAttribute(flowFile, "timestamp", ts -> builder.timestamp(new Date(Long.parseLong(ts))));
updateBuilderFromAttribute(flowFile, "type", builder::type);
updateBuilderFromAttribute(flowFile, "userId", builder::userId);
updateBuilderFromAttribute(flowFile, "appId", builder::appId);
updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers)));
if (propertyNames != null) {
switch (propertyNames){
case CONTENT_TYPE:
builder.contentType(amqpPropValue);
break;
case CONTENT_ENCODING:
builder.contentEncoding(amqpPropValue);
break;
case HEADERS:
builder.headers(AMQPUtils.validateAMQPHeaderProperty(amqpPropValue));
break;
case DELIVERY_MODE:
builder.deliveryMode(AMQPUtils.validateAMQPDeliveryModeProperty(amqpPropValue));
break;
case PRIORITY:
builder.priority(AMQPUtils.validateAMQPPriorityProperty(amqpPropValue));
break;
case CORRELATION_ID:
builder.correlationId(amqpPropValue);
break;
case REPLY_TO:
builder.replyTo(amqpPropValue);
break;
case EXPIRATION:
builder.expiration(amqpPropValue);
break;
case MESSAGE_ID:
builder.messageId(amqpPropValue);
break;
case TIMESTAMP:
builder.timestamp(AMQPUtils.validateAMQPTimestampProperty(amqpPropValue));
break;
case TYPE:
builder.type(amqpPropValue);
break;
case USER_ID:
builder.userId(amqpPropValue);
break;
case APP_ID:
builder.appId(amqpPropValue);
break;
case CLUSTER_ID:
builder.clusterId(amqpPropValue);
break;
}
} else {
getLogger().warn("Unrecognised AMQP property '" + amqpPropName + "', will ignore.");
}
}
}
return builder.build();
}
/**
* Will validate if provided amqpPropValue can be converted to a {@link Map}.
* Should be passed in the format: amqp$headers=key=value,key=value etc.
*
* @param amqpPropValue the value of the property
* @return {@link Map} if valid otherwise null
*/
private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue) {
String[] strEntries = amqpPropValue.split(",");
Map<String, Object> headers = new HashMap<>();
for (String strEntry : strEntries) {
String[] kv = strEntry.split("=");
if (kv.length == 2) {
headers.put(kv[0].trim(), kv[1].trim());
} else {
getLogger().warn("Malformed key value pair for AMQP header property: " + amqpPropValue);
}
}
return headers;
}
}

View File

@ -72,7 +72,6 @@ public class AMQPPublisherTest {
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
sender.publish("hello".getBytes(), null, "key1", "myExchange");
Thread.sleep(200);
}
assertNotNull(connection.createChannel().basicGet("queue1", true));
@ -95,9 +94,8 @@ public class AMQPPublisherTest {
try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""))) {
sender.publish("hello".getBytes(), null, "key1", "myExchange");
Thread.sleep(1000);
}
Thread.sleep(200);
verify(retListener, atMost(1)).handleReturn(Mockito.anyInt(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyString(), Mockito.any(BasicProperties.class), (byte[]) Mockito.any());
connection.close();

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.junit.Test;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
public class AMQPUtilsTest {
@Test
public void validateUpdateFlowFileAttributesWithAmqpProperties() {
PublishAMQP processor = new PublishAMQP();
ProcessSession processSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong()),
processor);
FlowFile sourceFlowFile = processSession.create();
BasicProperties amqpProperties = new AMQP.BasicProperties.Builder()
.contentType("text/plain").deliveryMode(2)
.priority(1).userId("joe")
.build();
FlowFile f2 = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, sourceFlowFile,
processSession);
assertEquals("text/plain", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "contentType"));
assertEquals("joe", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "userId"));
assertEquals("2", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "deliveryMode"));
}
}

View File

@ -29,6 +29,8 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Connection;
/**
* Unit tests for the AbstractAMQPProcessor class
@ -77,12 +79,12 @@ public class AbstractAMQPProcessorTest {
*/
public static class MockAbstractAMQPProcessor extends AbstractAMQPProcessor<AMQPConsumer> {
@Override
protected void rendezvousWithAmqp(ProcessContext context, ProcessSession session) throws ProcessException {
protected void processResource(Connection connection, AMQPConsumer consumer, ProcessContext context, ProcessSession session) throws ProcessException {
// nothing to do
}
@Override
protected AMQPConsumer finishBuildingTargetResource(ProcessContext context) {
protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection connection) {
return null;
}
}

View File

@ -20,14 +20,12 @@ import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -41,12 +39,10 @@ public class ConsumeAMQPTest {
@Test
public void validateSuccessfullConsumeAndTransferToSuccess() throws Exception {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("key1", Arrays.asList("queue1", "queue2"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("myExchange", "key1");
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
@ -57,7 +53,6 @@ public class ConsumeAMQPTest {
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
runner.run();
Thread.sleep(200);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
}
@ -65,25 +60,20 @@ public class ConsumeAMQPTest {
}
public static class LocalConsumeAMQP extends ConsumeAMQP {
private final Connection connection;
private final Connection conection;
public LocalConsumeAMQP(Connection connection) {
this.conection = connection;
this.connection = connection;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
synchronized (this) {
if (this.amqpConnection == null || !this.amqpConnection.isOpen()) {
this.amqpConnection = this.conection;
this.targetResource = this.finishBuildingTargetResource(context);
}
}
this.rendezvousWithAmqp(context, session);
protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection connection) {
return new AMQPConsumer(connection, context.getProperty(QUEUE).getValue());
}
public Connection getConnection() {
return this.amqpConnection;
@Override
protected Connection createConnection(ProcessContext context) {
return connection;
}
}
}

View File

@ -22,15 +22,13 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -44,13 +42,13 @@ public class PublishAMQPTest {
@Test
public void validateSuccessfullPublishAndTransferToSuccess() throws Exception {
PublishAMQP pubProc = new LocalPublishAMQP(false);
TestRunner runner = TestRunners.newTestRunner(pubProc);
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.HOST, "injvm");
runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
Map<String, String> attributes = new HashMap<>();
final Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "bar");
attributes.put("amqp$contentType", "foo/bar");
attributes.put("amqp$contentEncoding", "foobar123");
@ -70,20 +68,21 @@ public class PublishAMQPTest {
runner.enqueue("Hello Joe".getBytes(), attributes);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel();
GetResponse msg1 = channel.basicGet("queue1", true);
final Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel();
final GetResponse msg1 = channel.basicGet("queue1", true);
assertNotNull(msg1);
assertEquals("foo/bar", msg1.getProps().getContentType());
assertEquals("foobar123", msg1.getProps().getContentEncoding());
Map<String, Object> headerMap = msg1.getProps().getHeaders();
final Map<String, Object> headerMap = msg1.getProps().getHeaders();
Object foo = headerMap.get("foo");
Object foo2 = headerMap.get("foo2");
Object foo3 = headerMap.get("foo3");
final Object foo = headerMap.get("foo");
final Object foo2 = headerMap.get("foo2");
final Object foo3 = headerMap.get("foo3");
assertEquals("bar", foo.toString());
assertEquals("bar2", foo2.toString());
@ -115,53 +114,29 @@ public class PublishAMQPTest {
runner.enqueue("Hello Joe".getBytes());
runner.run();
Thread.sleep(200);
assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty());
assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0));
}
public static class LocalPublishAMQP extends PublishAMQP {
private final boolean closeConnection;
public static class LocalPublishAMQP extends PublishAMQP {
private TestConnection connection;
public LocalPublishAMQP() {
this(true);
}
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
public LocalPublishAMQP(boolean closeConection) {
this.closeConnection = closeConection;
connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
synchronized (this) {
if (this.amqpConnection == null || !this.amqpConnection.isOpen()) {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("key1", Arrays.asList("queue1", "queue2"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("myExchange", "key1");
this.amqpConnection = new TestConnection(exchangeToRoutingKeymap, routingMap);
this.targetResource = this.finishBuildingTargetResource(context);
}
}
this.rendezvousWithAmqp(context, session);
protected Connection createConnection(ProcessContext context) {
return connection;
}
public Connection getConnection() {
this.close();
return this.amqpConnection;
}
// since we really don't have any real connection (rather emulated one), the override is
// needed here so the call to close from TestRunner does nothing since we are
// grabbing the emulated connection later to do the assertions in some tests.
@Override
@OnStopped
public void close() {
if (this.closeConnection) {
super.close();
}
return connection;
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@ -37,14 +38,19 @@ import com.rabbitmq.client.AMQP.Queue.PurgeOk;
import com.rabbitmq.client.AMQP.Tx.CommitOk;
import com.rabbitmq.client.AMQP.Tx.RollbackOk;
import com.rabbitmq.client.AMQP.Tx.SelectOk;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.FlowListener;
import com.rabbitmq.client.ConsumerShutdownSignalCallback;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.Method;
import com.rabbitmq.client.ReturnCallback;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
@ -55,19 +61,12 @@ import com.rabbitmq.client.ShutdownSignalException;
class TestChannel implements Channel {
private final ExecutorService executorService;
private final Map<String, BlockingQueue<GetResponse>> enqueuedMessages;
private final Map<String, List<String>> routingKeyToQueueMappings;
private final Map<String, String> exchangeToRoutingKeyMappings;
private final List<ReturnListener> returnListeners;
private boolean open;
private boolean corrupted;
private Connection connection;
public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
@ -97,28 +96,24 @@ class TestChannel implements Channel {
@Override
public void addShutdownListener(ShutdownListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void removeShutdownListener(ShutdownListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public ShutdownSignalException getCloseReason() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void notifyListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@ -129,8 +124,7 @@ class TestChannel implements Channel {
@Override
public int getChannelNumber() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
@ -145,28 +139,19 @@ class TestChannel implements Channel {
@Override
public void close(int closeCode, String closeMessage) throws IOException, TimeoutException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean flowBlocked() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort(int closeCode, String closeMessage) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@ -177,88 +162,58 @@ class TestChannel implements Channel {
@Override
public boolean removeReturnListener(ReturnListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void clearReturnListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void addFlowListener(FlowListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean removeFlowListener(FlowListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void clearFlowListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void addConfirmListener(ConfirmListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean removeConfirmListener(ConfirmListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void clearConfirmListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public Consumer getDefaultConsumer() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void setDefaultConsumer(Consumer consumer) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicQos(int prefetchCount, boolean global) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicQos(int prefetchCount) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@ -329,198 +284,169 @@ class TestChannel implements Channel {
@Override
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate,
BasicProperties props, byte[] body) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, String type) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclarePassive(String name) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeleteOk exchangeDelete(String exchange) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public BindOk exchangeBind(String destination, String source, String routingKey) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void exchangeUnbindNoWait(String destination, String source, String routingKey,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public PurgeOk queuePurge(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
@ -535,156 +461,254 @@ class TestChannel implements Channel {
@Override
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicReject(long deliveryTag, boolean requeue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, Consumer callback) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive,
Map<String, Object> arguments, Consumer callback) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicCancel(String consumerTag) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public RecoverOk basicRecover() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public RecoverOk basicRecover(boolean requeue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public SelectOk txSelect() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public CommitOk txCommit() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public RollbackOk txRollback() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Confirm.SelectOk confirmSelect() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public long getNextPublishSeqNo() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean waitForConfirms() throws InterruptedException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void waitForConfirmsOrDie() throws IOException, InterruptedException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void asyncRpc(Method method) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public Command rpc(Method method) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public long messageCount(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public long consumerCount(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public ReturnListener addReturnListener(ReturnCallback returnCallback) {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback) {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback,
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback,
CancelCallback cancelCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback,
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback,
CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
}

View File

@ -21,12 +21,14 @@ import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import com.rabbitmq.client.BlockedCallback;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ExceptionHandler;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.UnblockedCallback;
/**
* Implementation of {@link Connection} to be used for testing. Will return the
@ -42,11 +44,10 @@ import com.rabbitmq.client.ShutdownSignalException;
class TestConnection implements Connection {
private final TestChannel channel;
private boolean open;
private String id;
public TestConnection(Map<String, String> exchangeToRoutingKeyMappings,
Map<String, List<String>> routingKeyToQueueMappings) {
public TestConnection(Map<String, String> exchangeToRoutingKeyMappings, Map<String, List<String>> routingKeyToQueueMappings) {
this.channel = new TestChannel(exchangeToRoutingKeyMappings, routingKeyToQueueMappings);
this.channel.setConnection(this);
this.open = true;
@ -54,26 +55,22 @@ class TestConnection implements Connection {
@Override
public void addShutdownListener(ShutdownListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void removeShutdownListener(ShutdownListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public ShutdownSignalException getCloseReason() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void notifyListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
@ -92,38 +89,32 @@ class TestConnection implements Connection {
@Override
public int getPort() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public int getChannelMax() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public int getFrameMax() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public int getHeartbeat() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public Map<String, Object> getClientProperties() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public Map<String, Object> getServerProperties() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
@ -133,8 +124,7 @@ class TestConnection implements Connection {
@Override
public Channel createChannel(int channelNumber) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
@ -149,67 +139,76 @@ class TestConnection implements Connection {
@Override
public void close(int closeCode, String closeMessage) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void close(int timeout) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void close(int closeCode, String closeMessage, int timeout) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort(int closeCode, String closeMessage) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort(int timeout) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort(int closeCode, String closeMessage, int timeout) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void addBlockedListener(BlockedListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean removeBlockedListener(BlockedListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public void clearBlockedListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public ExceptionHandler getExceptionHandler() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}
@Override
public String getClientProvidedName() {
return "unit-test";
}
@Override
public BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback) {
return null;
}
@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
}