NIFI-865 Added processors to Get and Put to/from AMQP-based messaging systems

This closes #200

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Oleg Zhurakousky 2016-01-31 13:20:14 -05:00 committed by Bryan Bende
parent 75af3a2ebe
commit dc8b62c3a6
26 changed files with 2965 additions and 0 deletions

View File

@ -272,6 +272,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-elasticsearch-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
<properties>

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-bundle</artifactId>
<version>0.4.2-SNAPSHOT</version>
</parent>
<artifactId>nifi-amqp-nar</artifactId>
<version>0.4.2-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-processors</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,212 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'RabbitMQ Client API' which is
licensed under Apache Public License 2.0.

View File

@ -0,0 +1,16 @@
nifi-amqp-nar
Copyright 2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) RabbitMQ Client API
The following NOTICE information applies:
RabbitMQ Client API
Copyright 2001-2015 The Apache Software Foundation

View File

@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-bundle</artifactId>
<version>0.4.2-SNAPSHOT</version>
</parent>
<artifactId>nifi-amqp-processors</artifactId>
<packaging>jar</packaging>
<properties>
<amqp-client.version>3.6.0</amqp-client.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${amqp-client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<!-- test data -->
<exclude>src/test/resources/testdata/*</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import org.apache.nifi.processor.exception.ProcessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
/**
* Generic consumer of messages from AMQP-based messaging system. It is based on
* RabbitMQ client API (https://www.rabbitmq.com/api-guide.html)
*/
final class AMQPConsumer extends AMQPWorker {
private final static Logger logger = LoggerFactory.getLogger(AMQPConsumer.class);
private final String queueName;
/**
* Creates an instance of this consumer
* @param connection instance of AMQP {@link Connection}
* @param queueName name of the queue from which messages will be consumed.
*/
AMQPConsumer(Connection connection, String queueName) {
super(connection);
this.validateStringProperty("queueName", queueName);
this.queueName = queueName;
logger.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue");
}
/**
* Consumes message from the queue identified by 'queueName' on each
* invocation via {@link Channel#basicGet(String, boolean)} operation
* returning instance of {@link GetResponse} In the event there are no
* messages in the queue it will return null. In the event queue is not
* defined in the system exception is raised by the target API. It will be
* logged and propagated as {@link ProcessException}
*
* @return instance of {@link GetResponse}
*/
public GetResponse consume() {
try {
return this.channel.basicGet(this.queueName, true);
} catch (IOException e) {
logger.error("Failed to receive message from AMQP; " + this + ". Possible reasons: Queue '" + this.queueName
+ "' may not have been defined", e);
throw new ProcessException(e);
}
}
/**
*
*/
@Override
public String toString() {
return super.toString() + ", QUEUE:" + this.queueName;
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import org.apache.nifi.logging.ProcessorLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
/**
* Generic publisher of messages to AMQP-based messaging system. It is based on
* RabbitMQ client API (https://www.rabbitmq.com/api-guide.html)
*/
final class AMQPPublisher extends AMQPWorker {
private final static Logger logger = LoggerFactory.getLogger(AMQPPublisher.class);
private final String exchangeName;
private final String routingKey;
private final ProcessorLog processLog;
/**
* Creates an instance of this publisher
*
* @param connection
* instance of AMQP {@link Connection}
* @param exchangeName
* the name of AMQP exchange to which messages will be published.
* If not provided 'default' exchange will be used.
* @param routingKey
* (required) the name of the routingKey to be used by AMQP-based
* system to route messages to its final destination (queue).
*/
AMQPPublisher(Connection connection, String exchangeName, String routingKey, ProcessorLog processLog) {
super(connection);
this.processLog = processLog;
this.validateStringProperty("routingKey", routingKey);
this.exchangeName = exchangeName == null ? "" : exchangeName.trim();
if (this.exchangeName.length() == 0) {
logger.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
this.routingKey = routingKey;
this.channel.addReturnListener(new UndeliverableMessageLogger());
logger.info("Successfully connected AMQPPublisher to " + connection.toString() + " and '" + this.exchangeName
+ "' exchange with '" + routingKey + "' as a routing key.");
}
/**
* Publishes message without any AMQP properties (see
* {@link BasicProperties}) to a pre-defined AMQP Exchange.
*
* @param bytes
* bytes representing a message.
*/
void publish(byte[] bytes) {
this.publish(bytes, null);
}
/**
* Publishes message with provided AMQP properties (see
* {@link BasicProperties}) to a pre-defined AMQP Exchange.
*
* @param bytes
* bytes representing a message.
* @param properties
* instance of {@link BasicProperties}
*/
void publish(byte[] bytes, BasicProperties properties) {
if (this.channel.isOpen()) {
try {
this.channel.basicPublish(this.exchangeName, this.routingKey, true, properties, bytes);
} catch (Exception e) {
throw new IllegalStateException("Failed to publish to '" +
this.exchangeName + "' with '" + this.routingKey + "'.", e);
}
} else {
throw new IllegalStateException("This instance of AMQPPublisher is invalid since "
+ "its publishigChannel is closed");
}
}
/**
*
*/
@Override
public String toString() {
return super.toString() + ", EXCHANGE:" + this.exchangeName + ", ROUTING_KEY:" + this.routingKey;
}
/**
* Listener to listen and WARN-log undeliverable messages which are returned
* back to the sender. Since in the current implementation messages are sent
* with 'mandatory' bit set, such messages must have final destination
* otherwise they are silently dropped which could cause a confusion
* especially during early stages of flow development. This implies that
* bindings between exchange -> routingKey -> queue must exist and are
* typically done by AMQP administrator. This logger simply helps to monitor
* for such conditions by logging such messages as warning. In the future
* this can be extended to provide other type of functionality (e.g., fail
* processor etc.)
*/
private final class UndeliverableMessageLogger implements ReturnListener {
@Override
public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message)
throws IOException {
String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey
+ "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + ".";
logger.warn(logMessage);
AMQPPublisher.this.processLog.warn(logMessage);
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* Utility helper class simplify interactions with target AMQP API and NIFI API.
*
*/
abstract class AMQPUtils {
public final static String AMQP_PROP_DELIMITER = "$";
public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER;
private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class);
private final static List<String> propertyNames = Arrays.asList("amqp$contentType", "amqp$contentEncoding",
"amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
"amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId",
"amqp$clusterId");
/**
* Returns a {@link List} of AMQP property names defined in
* {@link BasicProperties}
*/
public static List<String> getAmqpPropertyNames() {
return propertyNames;
}
/**
* Updates {@link FlowFile} with attributes representing AMQP properties
*
* @param amqpProperties
* instance of {@link BasicProperties}
* @param flowFile
* instance of target {@link FlowFile}
* @param processSession
* instance of {@link ProcessSession}
*/
public static FlowFile updateFlowFileAttributesWithAmqpProperties(BasicProperties amqpProperties, FlowFile flowFile, ProcessSession processSession) {
if (amqpProperties != null){
try {
Method[] methods = BasicProperties.class.getDeclaredMethods();
Map<String, String> attributes = new HashMap<String, String>();
for (Method method : methods) {
if (Modifier.isPublic(method.getModifiers()) && method.getName().startsWith("get")) {
Object amqpPropertyValue = method.invoke(amqpProperties);
if (amqpPropertyValue != null) {
String propertyName = extractPropertyNameFromMethod(method);
if (isValidAmqpPropertyName(propertyName)) {
if (propertyName.equals(AMQP_PROP_PREFIX + "contentType")) {
attributes.put(CoreAttributes.MIME_TYPE.key(), amqpPropertyValue.toString());
}
attributes.put(propertyName, amqpPropertyValue.toString());
}
}
}
}
flowFile = processSession.putAllAttributes(flowFile, attributes);
} catch (Exception e) {
logger.warn("Failed to update FlowFile with AMQP attributes", e);
}
}
return flowFile;
}
/**
* Will validate if provided name corresponds to valid AMQP property.
*
* @see AMQPUtils#getAmqpPropertyNames()
*
* @param name
* the name of the property
* @return 'true' if valid otherwise 'false'
*/
public static boolean isValidAmqpPropertyName(String name) {
return propertyNames.contains(name);
}
/**
*
*/
private static String extractPropertyNameFromMethod(Method method) {
char c[] = method.getName().substring(3).toCharArray();
c[0] = Character.toLowerCase(c[0]);
return AMQP_PROP_PREFIX + new String(c);
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* Base class for implementing publishing and consuming AMQP workers.
*
* @see AMQPPublisher
* @see AMQPConsumer
*/
abstract class AMQPWorker implements AutoCloseable {
private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
protected final Channel channel;
/**
* Creates an instance of this worker initializing it with AMQP
* {@link Connection} and creating a target {@link Channel} used by
* sub-classes to interact with AMQP-based messaging system.
*
* @param connection
* instance of {@link Connection}
*/
public AMQPWorker(Connection connection) {
this.validateConnection(connection);
try {
this.channel = connection.createChannel();
} catch (IOException e) {
logger.error("Failed to create Channel for " + connection, e);
throw new IllegalStateException(e);
}
}
/**
* Closes {@link Channel} created when instance of this class was created.
*/
@Override
public void close() throws TimeoutException, IOException {
if (logger.isDebugEnabled()) {
logger.debug("Closing AMQP channel for " + this.channel.getConnection().toString());
}
this.channel.close();
}
/**
*
*/
@Override
public String toString() {
return this.getClass().getSimpleName() + ":" + this.channel.getConnection().toString();
}
/**
* Validates that a String property has value (not null nor empty)
*
* @param propertyName
* the name of the property
* @param value
* the value of the property
*/
void validateStringProperty(String propertyName, String value) {
if (value == null || value.trim().length() == 0) {
throw new IllegalArgumentException("'" + propertyName + "' must not be null or empty");
}
}
/**
* Validates that {@link Connection} is not null and open.
*
* @param connection
* instance of {@link Connection}
*/
private void validateConnection(Connection connection) {
if (connection == null) {
throw new IllegalArgumentException("'connection' must not be null!");
}
if (!connection.isOpen()) {
throw new IllegalStateException("'connection' must be open!");
}
}
}

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Base processor that uses RabbitMQ client API
* (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
* messaging systems version 0.9.1
*
* @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
* and {@link AMQPConsumer}
*/
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {
public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
.name("Host Name")
.description("Network address of AMQP broker (e.g., localhost)")
.required(true)
.defaultValue("localhost")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("Numeric value identifying Port of AMQP broker (e.g., 5671)")
.required(true)
.defaultValue("5672")
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder()
.name("Virtual Host")
.description("Virtual Host name which segregates AMQP system for enhanced security.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
.name("User Name")
.description("User Name used for authentication and authorization.")
.required(true)
.defaultValue("guest")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("Password used for authentication and authorization.")
.required(true)
.defaultValue("guest")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor AMQP_VERSION = new PropertyDescriptor.Builder()
.name("AMQP Version")
.description("AMQP Version. Currently only supports AMQP v0.9.1.")
.required(true)
.allowableValues("0.9.1")
.defaultValue("0.9.1")
.build();
static List<PropertyDescriptor> descriptors = new ArrayList<>();
/*
* Will ensure that list of PropertyDescriptors is build only once, since
* all other lifecycle methods are invoked multiple times.
*/
static {
descriptors.add(HOST);
descriptors.add(PORT);
descriptors.add(V_HOST);
descriptors.add(USER);
descriptors.add(PASSWORD);
descriptors.add(AMQP_VERSION);
}
protected volatile Connection amqpConnection;
protected volatile T targetResource;
/**
* Will builds target resource ({@link AMQPPublisher} or
* {@link AMQPConsumer}) upon first invocation and will delegate to the
* implementation of
* {@link #rendezvousWithAmqp(ProcessContext, ProcessSession)} method for
* further processing.
*/
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
synchronized (this) {
this.buildTargetResource(context);
}
this.rendezvousWithAmqp(context, session);
}
/**
* Will close current AMQP connection.
*/
@OnStopped
public void close() {
try {
if (this.targetResource != null) {
this.targetResource.close();
}
} catch (Exception e) {
this.getLogger().warn("Failure while closing target resource " + this.targetResource, e);
}
try {
if (this.amqpConnection != null) {
this.amqpConnection.close();
}
} catch (IOException e) {
this.getLogger().warn("Failure while closing connection", e);
}
this.amqpConnection = null;
}
/**
* Delegate method to supplement
* {@link #onTrigger(ProcessContext, ProcessSession)}. It is implemented by
* sub-classes to perform {@link Processor} specific functionality.
*
* @param context
* instance of {@link ProcessContext}
* @param session
* instance of {@link ProcessSession}
*/
protected abstract void rendezvousWithAmqp(ProcessContext context, ProcessSession session) throws ProcessException;
/**
* Delegate method to supplement building of target {@link AMQPWorker} (see
* {@link AMQPPublisher} or {@link AMQPConsumer}) and is implemented by
* sub-classes.
*
* @param context
* instance of {@link ProcessContext}
* @return new instance of {@link AMQPWorker}
*/
protected abstract T finishBuildingTargetResource(ProcessContext context);
/**
* Builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}).
* It does so by making a {@link Connection} and then delegating to the
* implementation of {@link #finishBuildingTargetResource(ProcessContext)}
* which will build {@link AMQPWorker} (see {@link AMQPPublisher} or
* {@link AMQPConsumer}).
*/
private void buildTargetResource(ProcessContext context) {
if (this.amqpConnection == null || !this.amqpConnection.isOpen()) {
this.amqpConnection = this.createConnection(context);
this.targetResource = this.finishBuildingTargetResource(context);
}
}
/**
* Creates {@link Connection} to AMQP system.
*/
private Connection createConnection(ProcessContext context) {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(context.getProperty(HOST).getValue());
cf.setPort(Integer.parseInt(context.getProperty(PORT).getValue()));
cf.setUsername(context.getProperty(USER).getValue());
cf.setPassword(context.getProperty(PASSWORD).getValue());
String vHost = context.getProperty(V_HOST).getValue();
if (vHost != null) {
cf.setVirtualHost(vHost);
}
try {
Connection connection = cf.newConnection();
return connection;
} catch (Exception e) {
throw new IllegalStateException("Failed to establish connection with AMQP Broker: " + cf.toString(), e);
}
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.GetResponse;
/**
* Consuming AMQP processor which upon each invocation of
* {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a
* {@link FlowFile} containing the body of the consumed AMQP message and AMQP
* properties that came with message which are added to a {@link FlowFile} as
* attributes.
*/
@Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" })
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Consumes AMQP Message transforming its content to a FlowFile and transitioning it to 'success' relationship")
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("Queue")
.description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are received from the AMQP queue are routed to this relationship")
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
/*
* Will ensure that the list of property descriptors is build only once.
* Will also create a Set of relationships
*/
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(QUEUE);
_propertyDescriptors.addAll(descriptors);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(_relationships);
}
/**
* Will construct a {@link FlowFile} containing the body of the consumed
* AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
* not null) and AMQP properties that came with message which are added to a
* {@link FlowFile} as attributes, transferring {@link FlowFile} to
* 'success' {@link Relationship}.
*/
@Override
protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
final GetResponse response = this.targetResource.consume();
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(response.getBody());
}
});
BasicProperties amqpProperties = response.getProps();
flowFile = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile,
this.amqpConnection.toString() + "/" + context.getProperty(QUEUE).getValue());
processSession.transfer(flowFile, REL_SUCCESS);
} else {
context.yield();
}
}
/**
* Will create an instance of {@link AMQPConsumer}
*/
@Override
protected AMQPConsumer finishBuildingTargetResource(ProcessContext context) {
String queueName = context.getProperty(QUEUE).getValue();
return new AMQPConsumer(this.amqpConnection, queueName);
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
/**
*
*/
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
}

View File

@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* Publishing AMQP processor which upon each invocation of
* {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
* AMQP message sending it to an exchange identified during construction of this
* class while transferring the incoming {@link FlowFile} to 'success'
* {@link Relationship}.
*
* Expects that queues, exchanges and bindings are pre-defined by an AMQP
* administrator
*/
@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Exchange."
+ "In a typical AMQP exchange model, the message that is sent to the AMQP Exchange will be routed based on the 'Routing Key' "
+ "to its final destination in the queue (the binding). If due to some misconfiguration the binding between the Exchange, Routing Key "
+ "and Queue is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue). If "
+ "that happens you will see a log in both app-log and bulletin stating to that effect. Fixing the binding "
+ "(normally done by AMQP administrator) will resolve the issue.")
public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
.name("Exchange Name")
.description("The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). "
+ "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.")
.required(true)
.defaultValue("")
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder()
.name("Routing Key")
.description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). "
+ "Usually provided by the administrator (e.g., 'myKey')In the event when messages are sent to a default exchange this property "
+ "corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set "
+ "(usually by the AMQP administrator)")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are sent to the AMQP destination are routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship")
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
private final static List<String> amqpPropertyNames = AMQPUtils.getAmqpPropertyNames();
/*
* Will ensure that the list of property descriptors is build only once.
* Will also create a Set of relationships
*/
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(EXCHANGE);
_propertyDescriptors.add(ROUTING_KEY);
_propertyDescriptors.addAll(descriptors);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
/**
* Will construct AMQP message by extracting its body from the incoming
* {@link FlowFile}. AMQP {@link Properties} will be extracted from the
* {@link FlowFile} and converted to {@link BasicProperties} to be sent
* along with the message. Upon success the incoming {@link FlowFile} is
* transfered to 'success' {@link Relationship} and upon failure FlowFile is
* penalized and transfered to the 'failure' {@link Relationship}
* <br>
* NOTE: Attributes extracted from {@link FlowFile} are considered
* candidates for AMQP properties if their names are prefixed with
* {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
*
*/
@Override
protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
FlowFile flowFile = processSession.get();
if (flowFile != null) {
BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile);
byte[] messageContent = this.extractMessage(flowFile, processSession);
try {
this.targetResource.publish(messageContent, amqpProperties);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile,
this.amqpConnection.toString() + "/E:" + context.getProperty(EXCHANGE).getValue() + "/RK:"
+ context.getProperty(ROUTING_KEY).getValue());
} catch (Exception e) {
processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e);
context.yield();
}
}
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
/**
*
*/
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
/**
* Will create an instance of {@link AMQPPublisher}
*/
@Override
protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) {
String exchangeName = context.getProperty(EXCHANGE).getValue();
String routingKey = context.getProperty(ROUTING_KEY).getValue();
return new AMQPPublisher(this.amqpConnection, exchangeName, routingKey, this.getLogger());
}
/**
* Extracts contents of the {@link FlowFile} as byte array.
*/
private byte[] extractMessage(FlowFile flowFile, ProcessSession session){
final byte[] messageContent = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, messageContent, true);
}
});
return messageContent;
}
/**
* Extracts AMQP properties from the {@link FlowFile} attributes. Attributes
* extracted from {@link FlowFile} are considered candidates for AMQP
* properties if their names are prefixed with
* {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
*/
private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
Map<String, String> attributes = flowFile.getAttributes();
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
for (Entry<String, String> attributeEntry : attributes.entrySet()) {
if (attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
String amqpPropName = attributeEntry.getKey().split("\\" + AMQPUtils.AMQP_PROP_DELIMITER)[1];
String amqpPropValue = attributeEntry.getValue();
try {
if (amqpPropertyNames.contains(AMQPUtils.AMQP_PROP_PREFIX + amqpPropName)) {
Method m = builder.getClass().getDeclaredMethod(amqpPropName, String.class);
m.invoke(builder, amqpPropValue);
} else {
getLogger().warn("Unrecogninsed AMQP property '" + amqpPropName + "', will ignore.");
}
} catch (Exception e) {
// should really never happen since it should be caught by
// the above IF.
getLogger().warn("Failed while trying to build AMQP Properties.", e);
}
}
}
return builder.build();
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.amqp.processors.PublishAMQP
org.apache.nifi.amqp.processors.ConsumeAMQP

View File

@ -0,0 +1,72 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ConsumeAMQP</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Summary</h2>
<p>
This processor consumes messages from AMQP messaging queue and converts them to a FlowFile to be routed to the next component in the flow.
At the time of writing this document the supported AMQP protocol version is v0.9.1.
</p>
<p>
The component is based on <a href="https://www.rabbitmq.com/api-guide.html">RabbitMQ Client API</a>
<a href="https://www.rabbitmq.com/getstarted.html">The following guide and tutorial</a> may also help you to brush up on some of the AMQP basics.
</p>
<p>
This processor does two things. It constructs FlwFile by extracting information from the consumed AMQP message (both body and attributes).
Once message is consumed a FlowFile is constructed. The message body is written to a FlowFile and its <i>com.rabbitmq.client.AMQP.BasicProperties</i>
are transfered into the FlowFile as attributes. AMQP attribute names are prefixed with <i>amqp$</i> prefix.
</p>
<h2>AMQP Properties</h2>
<p>
The following is the list of available standard AMQP properties which may come with the message: <i>("amqp$contentType", "amqp$contentEncoding",
"amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
"amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId",
"amqp$clusterId")</i>
</p>
<h2>Configuration Details</h2>
<p>
At the time of writing this document it only defines the essential configuration properties which are suitable for most cases.
Other properties will be defined later as this component progresses.
Configuring PublishAMQP:
</p>
<ol>
<li><b>Queue</b> - [REQUIRED] the name of AMQP queue the messages will retrieved from. Usually provided by administrator (e.g., 'amq.direct')
</li>
<li><b>Host Name</b> - [REQUIRED] the name of the host where AMQP broker is running.
Usually provided by administrator (e.g., 'myhost.com'). Defaults to 'localhost'.
</li>
<li><b>Port</b> - [REQUIRED] the port number where AMQP broker is running.
Usually provided by the administrator (e.g., '2453'). Defaults to '5672'.
</li>
<li><b>User Name</b> - [REQUIRED] user name to connect to AMQP broker.
Usually provided by the administrator (e.g., 'me'). Defaults to 'guest'.
</li>
<li><b>Password</b> - [REQUIRED] password to use with user name to connect to AMQP broker.
Usually provided by the administrator. Defaults to 'guest'.
</li>
<li><b>Virtual Host</b> - [OPTIONAL] Virtual Host name which segregates AMQP system for enhanced security.
Please refer to <a href="http://blog.dtzq.com/2012/06/rabbitmq-users-and-virtual-hosts.html">this blog</a> for more details on Virtual Host.
</li>
</ol>
</body>
</html>

View File

@ -0,0 +1,94 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PublishAMQP</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Summary</h2>
<p>
This processor publishes the contents of the incoming FlowFile to an AMQP-based messaging system.
At the time of writing this document the supported AMQP protocol version is v0.9.1.
</p>
<p>
The component is based on <a href="https://www.rabbitmq.com/api-guide.html">RabbitMQ Client API</a>
<a href="https://www.rabbitmq.com/getstarted.html">The following guide and tutorial</a> may also help you to brush up on some of the AMQP basics.
</p>
<p>
This processor does two things. It constructs AMQP Message by extracting FlowFile contents (both body and attributes).
Once message is constructed it is sent to an AMQP Exchange.
AMQP Properties will be extracted from the FlowFile and converted to <i>com.rabbitmq.client.AMQP.BasicProperties</i> to be sent
along with the message. Upon success the incoming FlowFile is transfered to <i>success</i> Relationship and upon failure FlowFile is
penalized and transfered to the <i>failure</i> Relationship.
</p>
<h2>Where did my message go?</h2>
<p>
In a typical AMQP exchange model, the message that is sent to an AMQP Exchange will be routed based on the <i>Routing Key</i>
to its final destination in the <i>Queue</i>. It's called <b>Binding</b>. If due to some misconfiguration the binding between the <i>Exchange, Routing Key
and the Queue</i> is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue).
If that happens you will see a log in both app-log and bulletin stating to that effect. Fixing the binding (normally done by AMQP administrator)
will resolve the issue.
</p>
<h2>AMQP Properties</h2>
<p>
Attributes extracted from the FlowFile are considered candidates for AMQP
properties if their names are prefixed with <i>amqp$</i> (e.g., amqp$contentType=text/xml). To enrich message with additional AMQP properties
you may use <b>UpdateAttribute</b> processor between the source processor and PublishAMQP processor.
The following is the list of available standard AMQP properties: <i>("amqp$contentType", "amqp$contentEncoding",
"amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
"amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId",
"amqp$clusterId")</i>
</p>
<h2>Configuration Details</h2>
<p>
At the time of writing this document it only defines the essential configuration properties which are suitable for most cases.
Other properties will be defined later as this component progresses.
Configuring PublishAMQP:
</p>
<ol>
<li><b>Exchange Name</b> - [OPTIONAL] the name of AMQP exchange the messages will be sent to. Usually provided by the administrator (e.g., 'amq.direct')
It is an optional property. If kept empty the messages will be sent to a default AMQP exchange. More on AMQP Exchanges could be found
<a href="https://www.rabbitmq.com/tutorials/amqp-concepts.html">here</a>.
</li>
<li><b>Routing Key</b> - [REQUIRED] the name of the routing key that will be used by AMQP to route messages from the exchange to destination queue(s).
Usually provided by administrator (e.g., 'myKey')
In the event when messages are sent to a default exchange this property corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via
Routing Key must be set (usually by the AMQP administrator). More on AMQP Exchanges and Bindings could be found
<a href="https://www.rabbitmq.com/tutorials/amqp-concepts.html">here</a>.
</li>
<li><b>Host Name</b> - [REQUIRED] the name of the host where AMQP broker is running.
Usually provided by administrator (e.g., 'myhost.com'). Defaults to 'localhost'.
</li>
<li><b>Port</b> - [REQUIRED] the port number where AMQP broker is running.
Usually provided by the administrator (e.g., '2453'). Defaults to '5672'.
</li>
<li><b>User Name</b> - [REQUIRED] user name to connect to AMQP broker.
Usually provided by the administrator (e.g., 'me'). Defaults to 'guest'.
</li>
<li><b>Password</b> - [REQUIRED] password to use with user name to connect to AMQP broker.
Usually provided by the administrator. Defaults to 'guest'.
</li>
<li><b>Virtual Host</b> - [OPTIONAL] Virtual Host name which segregates AMQP system for enhanced security.
Please refer to <a href="http://blog.dtzq.com/2012/06/rabbitmq-users-and-virtual-hosts.html">this blog</a> for more details on Virtual Host.
</li>
</ol>
</body>
</html>

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processor.exception.ProcessException;
import org.junit.Test;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
public class AMQPConsumerTest {
@SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void failOnNullConnection() {
new AMQPConsumer(null, null);
}
@SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void failOnNullQueueName() throws Exception {
Connection conn = new TestConnection(null, null);
new AMQPConsumer(conn, null);
}
@SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void failOnEmptyQueueName() throws Exception {
Connection conn = new TestConnection(null, null);
new AMQPConsumer(conn, " ");
}
@Test(expected = ProcessException.class)
public void failOnNonExistingQueue() throws Exception {
Connection conn = new TestConnection(null, null);
try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello")) {
consumer.consume();
}
}
@Test
public void validateSuccessfullConsumeWithEmptyQueueDefaultExchange() throws Exception {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("queue1", Arrays.asList("queue1"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("", "queue1");
Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1")) {
GetResponse response = consumer.consume();
assertNull(response);
}
}
@Test
public void validateSuccessfullConsumeWithEmptyQueue() throws Exception {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("key1", Arrays.asList("queue1"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("myExchange", "key1");
Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes());
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1")) {
GetResponse response = consumer.consume();
assertNotNull(response);
}
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockProcessorLog;
import org.junit.Test;
import org.mockito.Mockito;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
public class AMQPPublisherTest {
@SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void failOnNullConnection() {
new AMQPPublisher(null, null, null, null);
}
@SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void failOnMissingRoutingKey() throws Exception {
Connection conn = new TestConnection(null, null);
new AMQPPublisher(conn, null, "", null);
}
@Test(expected = IllegalStateException.class)
public void failPublishIfChannelClosed() throws Exception {
Connection conn = new TestConnection(null, null);
try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) {
conn.close();
sender.publish("oleg".getBytes());
}
}
@Test(expected = IllegalStateException.class)
public void failPublishIfChannelFails() throws Exception {
TestConnection conn = new TestConnection(null, null);
try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) {
((TestChannel) conn.createChannel()).corruptChannel();
sender.publish("oleg".getBytes());
}
}
@Test
public void validateSuccessfullPublishingAndRouting() throws Exception {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("key1", Arrays.asList("queue1", "queue2"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("myExchange", "key1");
Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) {
sender.publish("hello".getBytes());
Thread.sleep(200);
}
assertNotNull(connection.createChannel().basicGet("queue1", true));
assertNotNull(connection.createChannel().basicGet("queue2", true));
connection.close();
}
@Test
public void validateSuccessfullPublishingAndUndeliverableRoutingKey() throws Exception {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("key1", Arrays.asList("queue1", "queue2"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("myExchange", "key1");
Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
ReturnListener retListener = mock(ReturnListener.class);
connection.createChannel().addReturnListener(retListener);
try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key2",
new MockProcessorLog("foo", ""))) {
sender.publish("hello".getBytes());
Thread.sleep(1000);
}
Thread.sleep(200);
verify(retListener, atMost(1)).handleReturn(Mockito.anyInt(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyString(), Mockito.any(BasicProperties.class), (byte[]) Mockito.any());
connection.close();
}
@Test
public void validateToString() throws Exception {
TestConnection conn = new TestConnection(null, null);
try (AMQPPublisher sender = new AMQPPublisher(conn, "myExchange", "key1", null)) {
String toString = sender.toString();
assertTrue(toString.contains("EXCHANGE:myExchange, ROUTING_KEY:key1"));
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.junit.Test;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
public class AMQPUtilsTest {
@Test
public void validateUpdateFlowFileAttributesWithAmqpProperties() {
PublishAMQP processor = new PublishAMQP();
ProcessSession processSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong()),
processor);
FlowFile sourceFlowFile = processSession.create();
BasicProperties amqpProperties = new AMQP.BasicProperties.Builder()
.contentType("text/plain").deliveryMode(2)
.priority(1).userId("joe")
.build();
FlowFile f2 = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, sourceFlowFile,
processSession);
assertEquals("text/plain", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "contentType"));
assertEquals("joe", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "userId"));
assertEquals("2", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "deliveryMode"));
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertNotNull;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
public class ConsumeAMQPTest {
@Test
public void validateSuccessfullConsumeAndTransferToSuccess() throws Exception {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("key1", Arrays.asList("queue1", "queue2"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("myExchange", "key1");
Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) {
sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN);
ConsumeAMQP pubProc = new LocalConsumeAMQP(connection);
TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(ConsumeAMQP.HOST, "injvm");
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
runner.run();
Thread.sleep(200);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
}
}
public static class LocalConsumeAMQP extends ConsumeAMQP {
private final Connection conection;
public LocalConsumeAMQP(Connection connection) {
this.conection = connection;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
synchronized (this) {
if (this.amqpConnection == null || !this.amqpConnection.isOpen()) {
this.amqpConnection = this.conection;
this.targetResource = this.finishBuildingTargetResource(context);
}
}
this.rendezvousWithAmqp(context, session);
}
public Connection getConnection() {
return this.amqpConnection;
}
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
public class PublishAMQPTest {
@Test
public void validateSuccessfullPublishAndTransferToSuccess() throws Exception {
PublishAMQP pubProc = new LocalPublishAMQP(false);
TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.HOST, "injvm");
runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "bar");
attributes.put("amqp$contentType", "foo/bar");
runner.enqueue("Hello Joe".getBytes(), attributes);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel();
GetResponse msg1 = channel.basicGet("queue1", true);
assertNotNull(msg1);
assertEquals("foo/bar", msg1.getProps().getContentType());
assertNotNull(channel.basicGet("queue2", true));
}
@Test
public void validateFailedPublishAndTransferToFailure() throws Exception {
PublishAMQP pubProc = new LocalPublishAMQP();
TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.HOST, "injvm");
runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.enqueue("Hello Joe".getBytes());
runner.run();
Thread.sleep(200);
assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty());
assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0));
}
public static class LocalPublishAMQP extends PublishAMQP {
private final boolean closeConnection;
public LocalPublishAMQP() {
this(true);
}
public LocalPublishAMQP(boolean closeConection) {
this.closeConnection = closeConection;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
synchronized (this) {
if (this.amqpConnection == null || !this.amqpConnection.isOpen()) {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("key1", Arrays.asList("queue1", "queue2"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("myExchange", "key1");
this.amqpConnection = new TestConnection(exchangeToRoutingKeymap, routingMap);
this.targetResource = this.finishBuildingTargetResource(context);
}
}
this.rendezvousWithAmqp(context, session);
}
public Connection getConnection() {
this.close();
return this.amqpConnection;
}
// since we really don't have any real connection (rather emulated one), the override is
// needed here so the call to close from TestRunner does nothing since we are
// grabbing the emulated connection later to do the assertions in some tests.
@Override
@OnStopped
public void close() {
if (this.closeConnection) {
super.close();
}
}
}
}

View File

@ -0,0 +1,690 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.Basic.RecoverOk;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Exchange.BindOk;
import com.rabbitmq.client.AMQP.Exchange.DeclareOk;
import com.rabbitmq.client.AMQP.Exchange.DeleteOk;
import com.rabbitmq.client.AMQP.Exchange.UnbindOk;
import com.rabbitmq.client.AMQP.Queue.PurgeOk;
import com.rabbitmq.client.AMQP.Tx.CommitOk;
import com.rabbitmq.client.AMQP.Tx.RollbackOk;
import com.rabbitmq.client.AMQP.Tx.SelectOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.FlowListener;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.Method;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
/**
* Implementation of {@link Channel} to be used during testing
*/
class TestChannel implements Channel {
private final ExecutorService executorService;
private final Map<String, BlockingQueue<GetResponse>> enqueuedMessages;
private final Map<String, List<String>> routingKeyToQueueMappings;
private final Map<String, String> exchangeToRoutingKeyMappings;
private final List<ReturnListener> returnListeners;
private boolean open;
private boolean corrupted;
private Connection connection;
public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
Map<String, List<String>> routingKeyToQueueMappings) {
this.enqueuedMessages = new HashMap<>();
this.routingKeyToQueueMappings = routingKeyToQueueMappings;
if (this.routingKeyToQueueMappings != null) {
for (List<String> queues : routingKeyToQueueMappings.values()) {
for (String queue : queues) {
this.enqueuedMessages.put(queue, new ArrayBlockingQueue<GetResponse>(100));
}
}
}
this.exchangeToRoutingKeyMappings = exchangeToRoutingKeyMappings;
this.executorService = Executors.newCachedThreadPool();
this.returnListeners = new ArrayList<>();
this.open = true;
}
void corruptChannel() {
this.corrupted = true;
}
void setConnection(Connection connection) {
this.connection = connection;
}
@Override
public void addShutdownListener(ShutdownListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void removeShutdownListener(ShutdownListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public ShutdownSignalException getCloseReason() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void notifyListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean isOpen() {
return this.open;
}
@Override
public int getChannelNumber() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public Connection getConnection() {
return this.connection;
}
@Override
public void close() throws IOException, TimeoutException {
this.open = false;
}
@Override
public void close(int closeCode, String closeMessage) throws IOException, TimeoutException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean flowBlocked() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort(int closeCode, String closeMessage) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void addReturnListener(ReturnListener listener) {
this.returnListeners.add(listener);
}
@Override
public boolean removeReturnListener(ReturnListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void clearReturnListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void addFlowListener(FlowListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean removeFlowListener(FlowListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void clearFlowListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void addConfirmListener(ConfirmListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean removeConfirmListener(ConfirmListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void clearConfirmListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public Consumer getDefaultConsumer() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void setDefaultConsumer(Consumer consumer) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicQos(int prefetchCount, boolean global) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicQos(int prefetchCount) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
throws IOException {
this.basicPublish(exchange, routingKey, true, props, body);
}
@Override
public void basicPublish(final String exchange, final String routingKey, boolean mandatory,
final BasicProperties props, final byte[] body) throws IOException {
if (this.corrupted) {
throw new IOException("Channel is corrupted");
}
if (exchange.equals("")){ // default exchange; routingKey corresponds to a queue.
BlockingQueue<GetResponse> messages = this.getMessageQueue(routingKey);
GetResponse response = new GetResponse(null, props, body, messages.size());
messages.offer(response);
} else {
String rKey = this.exchangeToRoutingKeyMappings.get(exchange);
if (rKey.equals(routingKey)) {
List<String> queueNames = this.routingKeyToQueueMappings.get(routingKey);
if (queueNames == null || queueNames.isEmpty()) {
this.discard(exchange, routingKey, mandatory, props, body);
} else {
for (String queueName : queueNames) {
BlockingQueue<GetResponse> messages = this.getMessageQueue(queueName);
GetResponse response = new GetResponse(null, props, body, messages.size());
messages.offer(response);
}
}
} else {
this.discard(exchange, routingKey, mandatory, props, body);
}
}
}
private void discard(final String exchange, final String routingKey, boolean mandatory, final BasicProperties props,
final byte[] body) {
// NO ROUTE. Invoke return listener async
for (final ReturnListener listener : returnListeners) {
this.executorService.execute(new Runnable() {
@Override
public void run() {
try {
listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body);
} catch (Exception e) {
throw new IllegalStateException("Failed to send return message", e);
}
}
});
}
}
private BlockingQueue<GetResponse> getMessageQueue(String name) {
BlockingQueue<GetResponse> messages = this.enqueuedMessages.get(name);
if (messages == null) {
messages = new ArrayBlockingQueue<>(100);
this.enqueuedMessages.put(name, messages);
}
return messages;
}
@Override
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate,
BasicProperties props, byte[] body) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, String type) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeclareOk exchangeDeclarePassive(String name) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public DeleteOk exchangeDelete(String exchange) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public BindOk exchangeBind(String destination, String source, String routingKey) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void exchangeUnbindNoWait(String destination, String source, String routingKey,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey,
Map<String, Object> arguments) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public PurgeOk queuePurge(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public GetResponse basicGet(String queue, boolean autoAck) throws IOException {
BlockingQueue<GetResponse> messages = this.enqueuedMessages.get(queue);
if (messages == null) {
throw new IOException("Queue is not defined");
} else {
return messages.poll();
}
}
@Override
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicReject(long deliveryTag, boolean requeue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, Consumer callback) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)
throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive,
Map<String, Object> arguments, Consumer callback) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void basicCancel(String consumerTag) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public RecoverOk basicRecover() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public RecoverOk basicRecover(boolean requeue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public SelectOk txSelect() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public CommitOk txCommit() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public RollbackOk txRollback() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public com.rabbitmq.client.AMQP.Confirm.SelectOk confirmSelect() throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public long getNextPublishSeqNo() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean waitForConfirms() throws InterruptedException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void waitForConfirmsOrDie() throws IOException, InterruptedException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void asyncRpc(Method method) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public Command rpc(Method method) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public long messageCount(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public long consumerCount(String queue) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ExceptionHandler;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
/**
* Implementation of {@link Connection} to be used for testing. Will return the
* same instance of {@link Channel} when {@link #createChannel()} is called.
*
* This class essentially emulates AMQP system and attempts to ensure the same
* behavior on publish/subscribe and other core operations used by the NIFI AMQP
* component.
*
* NOTE: Only methods that are used by the framework are implemented. More
* could/should be added later
*/
class TestConnection implements Connection {
private final TestChannel channel;
private boolean open;
public TestConnection(Map<String, String> exchangeToRoutingKeyMappings,
Map<String, List<String>> routingKeyToQueueMappings) {
this.channel = new TestChannel(exchangeToRoutingKeyMappings, routingKeyToQueueMappings);
this.channel.setConnection(this);
this.open = true;
}
@Override
public void addShutdownListener(ShutdownListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void removeShutdownListener(ShutdownListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public ShutdownSignalException getCloseReason() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void notifyListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean isOpen() {
return this.open;
}
@Override
public InetAddress getAddress() {
try {
return InetAddress.getByName("localhost");
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public int getPort() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public int getChannelMax() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public int getFrameMax() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public int getHeartbeat() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public Map<String, Object> getClientProperties() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public Map<String, Object> getServerProperties() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public Channel createChannel() throws IOException {
return this.channel;
}
@Override
public Channel createChannel(int channelNumber) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void close() throws IOException {
this.open = false;
try {
this.channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close(int closeCode, String closeMessage) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void close(int timeout) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void close(int closeCode, String closeMessage, int timeout) throws IOException {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort(int closeCode, String closeMessage) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort(int timeout) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void abort(int closeCode, String closeMessage, int timeout) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void addBlockedListener(BlockedListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public boolean removeBlockedListener(BlockedListener listener) {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public void clearBlockedListeners() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
@Override
public ExceptionHandler getExceptionHandler() {
throw new UnsupportedOperationException(
"This method is not currently supported as it is not used by current API in testing");
}
}

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n

View File

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.4.2-SNAPSHOT</version>
</parent>
<artifactId>nifi-amqp-bundle</artifactId>
<version>0.4.2-SNAPSHOT</version>
<packaging>pom</packaging>
<description>A bundle of processors that run Flume sources/sinks</description>
<modules>
<module>nifi-amqp-processors</module>
<module>nifi-amqp-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-processors</artifactId>
<version>0.4.2-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -53,6 +53,7 @@
<module>nifi-html-bundle</module>
<module>nifi-scripting-bundle</module>
<module>nifi-elasticsearch-bundle</module>
<module>nifi-amqp-bundle</module>
</modules>
<dependencyManagement>
<dependencies>

View File

@ -1002,6 +1002,12 @@ language governing permissions and limitations under the License. -->
<version>0.4.2-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-nar</artifactId>
<version>0.4.2-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>