ARTEMIS-1019 removing vertx integration

This commit is contained in:
Clebert Suconic 2017-02-06 19:57:16 -05:00
parent 2a3885da0a
commit c0fe187666
20 changed files with 1 additions and 2333 deletions

View File

@ -77,11 +77,6 @@
<artifactId>artemis-spring-integration</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-vertx-integration</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq.rest</groupId>
<artifactId>artemis-rest</artifactId>

View File

@ -52,7 +52,6 @@
* [Apache Karaf](karaf.md)
* [Spring Integration](spring-integration.md)
* [AeroGear Integration](aerogear-integration.md)
* [VertX Integration](vertx-integration.md)
* [CDI Integration](cdi-integration.md)
* [Intercepting Operations](intercepting-operations.md)
* [Protocols and Interoperability](protocols-interoperability.md)

View File

@ -1,88 +0,0 @@
# Vert.x Integration
[Vert.x](http://vertx.io/) is a lightweight, high performance
application platform for the JVM that's designed for modern mobile, web,
and enterprise applications. Vert.x provides a distributed event bus
that allows messages to be sent across vert.x instances and clients. You
can now redirect and persist any vert.x messages to Apache ActiveMQ Artemis and route
those messages to a specified vertx address by configuring Apache ActiveMQ Artemis
vertx incoming and outgoing vertx connector services.
## Configuring a Vertx Incoming Connector Service
Vertx Incoming Connector services receive messages from vertx event bus
and route them to an Apache ActiveMQ Artemis queue. Such a service can be configured as
follows:
<connector-service name="vertx-incoming-connector">
<factory-class>org.apache.activemq.integration.vertx.VertxIncomingConnectorServiceFactory</factory-class>
<param key="host" value="127.0.0.1"/>
<param key="port" value="0"/>
<param key="queue" value="jms.queue.vertxQueue"/>
<param key="vertx-address" value="vertx.in.eventaddress"/>
</connector-service>
Shown are the required params for the connector service:
- `queue`. The name of the Apache ActiveMQ Artemis queue to send message to.
As well as these required parameters there are the following optional
parameters
- `host`. The host name on which the vertx target container is
running. Default is localhost.
- `port`. The port number to which the target vertx listens. Default
is zero.
- `quorum-size`. The quorum size of the target vertx instance.
- `ha-group`. The name of the ha-group of target vertx instance.
Default is `activemq`.
- `vertx-address`. The vertx address to listen to. default is
`org.apache.activemq`.
## Configuring a Vertx Outgoing Connector Service
Vertx Outgoing Connector services fetch vertx messages from a ActiveMQ
queue and put them to vertx event bus. Such a service can be configured
as follows:
<connector-service name="vertx-outgoing-connector">
<factory-class>org.apache.activemq.integration.vertx.VertxOutgoingConnectorServiceFactory</factory-class>
<param key="host" value="127.0.0.1"/>
<param key="port" value="0"/>
<param key="queue" value="jms.queue.vertxQueue"/>
<param key="vertx-address" value="vertx.out.eventaddress"/>
<param key="publish" value="true"/>
</connector-service>
Shown are the required params for the connector service:
- `queue`. The name of the Apache ActiveMQ Artemis queue to fetch message from.
As well as these required parameters there are the following optional
parameters
- `host`. The host name on which the vertx target container is
running. Default is localhost.
- `port`. The port number to which the target vertx listens. Default
is zero.
- `quorum-size`. The quorum size of the target vertx instance.
- `ha-group`. The name of the ha-group of target vertx instance.
Default is `activemq`.
- `vertx-address`. The vertx address to put messages to. default is
org.apache.activemq.
- `publish`. How messages is sent to vertx event bus. "true" means
using publish style. "false" means using send style. Default is
false.

View File

@ -51,7 +51,6 @@ under the License.
<modules>
<module>aerogear</module>
<module>artemis-ra-rar</module>
<module>vertx</module>
</modules>
</profile>
</profiles>

View File

@ -1,142 +0,0 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.modules</groupId>
<artifactId>broker-modules</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>artemis-vertx-example</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis Vert.x Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
<vertx.version>2.1.2</vertx.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_2.0_spec</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-platform</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-hazelcast</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-vertx-integration</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create0</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<libListWithDeps>
<arg>org.apache.activemq.examples.modules:artemis-vertx-example:${project.version}</arg>
</libListWithDeps>
<instance>${basedir}/target/server0</instance>
<configuration>${basedir}/target/classes/activemq/server0</configuration>
</configuration>
</execution>
<execution>
<id>start0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server0</location>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
<name>server0</name>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.core.example.VertxConnectorExample</clientClass>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.modules</groupId>
<artifactId>artemis-vertx-example</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,103 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<html>
<head>
<title>ActiveMQ Artemis Vert.x Connector Service Example</title>
<link rel="stylesheet" type="text/css" href="../../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
<script type="text/javascript" src="../../../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>Vert.x Connector Service Example</h1>
<p>This example shows you how to configure ActiveMQ Artemis to use the Vert.x Connector Service.</p>
<p>ActiveMQ Artemis supports 2 types of Vert.x connector, incoming and outgoing.
Incoming connector consumes from Vert.x event bus and forwards to a configurable address.
Outgoing connector consumes from a configurable address and forwards to a configurable Vert.x event bus.
</p>
<p>In this example, an incoming connector and an outgoing connector are configured. A simple java Verticle
is deployed. The verticle registers a message handler on the outgoing connector's address ("outgoing.vertx.address").
A String message is sent to Vert.x event bus on the incoming connector's address("incoming.vertx.address").
The message then will be forwarded to a ActiveMQ Artemis queue by the incoming connector. The outgoing connector listens to
the ActiveMQ Artemis queue and forwards the message from ActiveMQ Artemis to Vert.x event bus on the outgoing connector's address.
The verticle finally receives the message from it's event bus.</p>
<p>For more information on Vert.x concept please visit the <a href="http://vertx.io/">Vertx site</a></p>
<h2>Example step-by-step</h2>
<p><i>To run the server, simply type <code>mvn verify</code>
from this directory.</p>
<ol>
<li>First we need to create a Vert.x PlatformManager</li>
<pre class="prettyprint">
<code>platformManager = PlatformLocator.factory.createPlatformManager(PORT, HOST);</code>
</pre>
<li>We deploy a Verticle using the platformManager</li>
<pre class="prettyprint">
<code>String verticle = "org.apache.activemq.artemis.core.example.ExampleVerticle";
platformManager.deployVerticle(verticle, null, new URL[0], 1, null,
new Handler<AsyncResult<String>>(){
@Override
public void handle(AsyncResult<String> result)
{
if (!result.succeeded())
{
throw new RuntimeException("failed to deploy verticle", result.cause());
}
latch0.countDown();
}
});</code>
</pre>
<li>We register a message handler with the event bus in the Verticle to listen on the outgoing connector's address.</li>
<pre class="prettyprint">
<code>EventBus eventBus = vertx.eventBus();
eventBus.registerHandler(VertxConnectorExample.OUTGOING,
new Handler<Message<?>>() {
@Override
public void handle(Message<?> startMsg)
{
Object body = startMsg.body();
System.out.println("Verticle receives a message: " + body);
VertxConnectorExample.result.set(VertxConnectorExample.MSG.equals(body));
latch0.countDown();
}
});
</code>
</pre>
<li>We send a message to incoming connector's address via event bus</li>
<pre class="prettyprint">
<code>
EventBus bus = platformManager.vertx().eventBus();
bus.send(INCOMING, MSG);
</code>
</pre>
<li>The message will eventually arrives at the Verticle's message handler.</li>
</ol>
</body>
</html>

View File

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.example;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.platform.Verticle;
public class ExampleVerticle extends Verticle {
@Override
public void start() {
EventBus eventBus = vertx.eventBus();
final CountDownLatch latch0 = new CountDownLatch(1);
// Register a handler on the outgoing connector's address
eventBus.registerHandler(VertxConnectorExample.OUTGOING, new Handler<Message<?>>() {
@Override
public void handle(Message<?> startMsg) {
Object body = startMsg.body();
System.out.println("Verticle receives a message: " + body);
VertxConnectorExample.result.set(VertxConnectorExample.MSG.equals(body));
latch0.countDown();
//Tell the example to finish.
VertxConnectorExample.latch.countDown();
}
});
try {
latch0.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
}

View File

@ -1,103 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.example;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
/**
* A simple example of using Vert.x connector service.
*/
public class VertxConnectorExample {
public static final String INCOMING = "incoming.vertx.address";
public static final String OUTGOING = "outgoing.vertx.address";
public static final String MSG = "Welcome to Vertx world!";
public static final CountDownLatch latch = new CountDownLatch(1);
public static final AtomicBoolean result = new AtomicBoolean(false);
private static final String HOST = "127.0.0.1";
private static final int PORT = 0;
public static void main(final String[] args) throws Exception {
System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
PlatformManager platformManager = null;
try {
// Step 1 Create a Vert.x PlatformManager
platformManager = PlatformLocator.factory.createPlatformManager(PORT, HOST);
final CountDownLatch latch0 = new CountDownLatch(1);
// Step 2 Deploy a Verticle to receive message
String verticle = "org.apache.activemq.artemis.core.example.ExampleVerticle";
platformManager.deployVerticle(verticle, null, new URL[0], 1, null, new Handler<AsyncResult<String>>() {
@Override
public void handle(AsyncResult<String> result) {
if (!result.succeeded()) {
throw new RuntimeException("failed to deploy verticle", result.cause());
}
latch0.countDown();
}
});
latch0.await();
// Step 3 Send a message to the incoming connector's address
EventBus bus = platformManager.vertx().eventBus();
bus.send(INCOMING, MSG);
// Step 4 Waiting for the Verticle to process the message
latch.await(10000, TimeUnit.MILLISECONDS);
} finally {
if (platformManager != null) {
platformManager.undeployAll(null);
platformManager.stop();
}
reportResultAndExit();
}
}
private static void reportResultAndExit() {
if (!result.get()) {
System.err.println();
System.err.println("#####################");
System.err.println("### FAILURE! ###");
System.err.println("#####################");
System.exit(1);
} else {
System.out.println();
System.out.println("#####################");
System.out.println("### SUCCESS! ###");
System.out.println("#####################");
System.exit(0);
}
}
}

View File

@ -1,82 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<bindings-directory>target/server0/data/messaging/bindings</bindings-directory>
<journal-directory>target/server0/data/messaging/journal</journal-directory>
<large-messages-directory>target/server0/data/messaging/largemessages</large-messages-directory>
<paging-directory>target/server0/data/messaging/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61616</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
</acceptors>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="queue.vertxQueue">
<permission roles="guest" type="consume"/>
<permission roles="guest" type="send"/>
</security-setting>
</security-settings>
<connector-services>
<connector-service name="my-incoming-vertx">
<factory-class>org.apache.activemq.artemis.integration.vertx.VertxIncomingConnectorServiceFactory</factory-class>
<param key="queue" value="queue.vertxQueue"/>
<param key="host" value="localhost"/>
<param key="port" value="0"/>
<param key="vertx-address" value="incoming.vertx.address"/>
</connector-service>
<connector-service name="my-outgoing-vertx">
<factory-class>org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServiceFactory</factory-class>
<param key="queue" value="queue.vertxQueue"/>
<param key="host" value="localhost"/>
<param key="port" value="0"/>
<param key="vertx-address" value="outgoing.vertx.address"/>
</connector-service>
</connector-services>
<addresses>
<address name="queue.vertxQueue">
<multicast>
<queue name="queue.vertxQueue"/>
</multicast>
</address>
<address name="exampleQueue">
<anycast>
<queue name="jms.queue.exampleQueue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -1,144 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-pom</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>artemis-vertx-integration</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis Vert.x Integration</name>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- Set pullInDeps to true if you want any modules specified in the 'includes' and 'deploys' fields
in your mod.json to be automatically pulled in during packaging and added inside your module. Doing this means your
module won't download and install those dependencies at run-time when they're first requested. -->
<vertx.pullInDeps>false</vertx.pullInDeps>
<!-- Set createFatJar to true if you want to create a fat executable jar which contains the Vert.x binaries
along with the module so it can be run with java -jar <jarname> -->
<vertx.createFatJar>false</vertx.createFatJar>
<!--Vertx module name-->
<module.name>${project.groupId}~${project.artifactId}~${project.version}</module.name>
<!-- The directory where the module will be assembled - you can override this on the command line
with -Dmods.directory=mydir -->
<mods.directory>target/mods</mods.directory>
<!--Dependency versions-->
<vertx.version>2.1.2</vertx.version>
<vertx.testtools.version>2.0.3-final</vertx.testtools.version>
<junit.version>4.11</junit.version>
<!--Plugin versions-->
<maven.compiler.plugin.version>3.0</maven.compiler.plugin.version>
<maven.resources.plugin.version>2.6</maven.resources.plugin.version>
<maven.clean.plugin.version>2.5</maven.clean.plugin.version>
<maven.vertx.plugin.version>2.0.8-final</maven.vertx.plugin.version>
<maven.surefire.plugin.version>2.14</maven.surefire.plugin.version>
<maven.failsafe.plugin.version>2.14</maven.failsafe.plugin.version>
<maven.surefire.report.plugin.version>2.14</maven.surefire.report.plugin.version>
<maven.javadoc.plugin.version>2.9</maven.javadoc.plugin.version>
<maven.dependency.plugin.version>2.7</maven.dependency.plugin.version>
</properties>
<repositories>
<repository>
<id>sonatype-nexus-snapshots</id>
<url>https://oss.sonatype.org/content/repositories/public</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<!--
JBoss Logging
-->
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
</dependency>
<!--Vertx provided dependencies-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-platform</artifactId>
<version>${vertx.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-hazelcast</artifactId>
<version>${vertx.version}</version>
<scope>provided</scope>
</dependency>
<!--Test dependencies-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>testtools</artifactId>
<version>${vertx.testtools.version}</version>
<scope>test</scope>
</dependency>
<!-- Add any other dependencies that you want packaged into your module (in the lib dir) here
as 'compile' dependencies. Here is an example
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>compile</scope>
</dependency>
-->
</dependencies>
</project>

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.integration.vertx;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.LogMessage;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageLogger;
/**
* Logger Code 19
*
* each message id must be 6 digits long starting with 19, the 3rd digit donates the level so
*
* INF0 1
* WARN 2
* DEBUG 3
* ERROR 4
* TRACE 5
* FATAL 6
*
* so an INFO message would be 191000 to 191999
*/
@MessageLogger(projectCode = "AMQ")
interface ActiveMQVertxLogger extends BasicLogger {
/**
* The vertx logger.
*/
ActiveMQVertxLogger LOGGER = Logger.getMessageLogger(ActiveMQVertxLogger.class, ActiveMQVertxLogger.class.getPackage().getName());
@LogMessage(level = Logger.Level.WARN)
@Message(id = 192001, value = "Non vertx message: {0}", format = Message.Format.MESSAGE_FORMAT)
void nonVertxMessage(ServerMessage message);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 192002, value = "Invalid vertx type: {0}", format = Message.Format.MESSAGE_FORMAT)
void invalidVertxType(Integer type);
}

View File

@ -1,265 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.integration.vertx;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ConnectorService;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.eventbus.ReplyException;
import org.vertx.java.core.eventbus.impl.PingMessage;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
class IncomingVertxEventHandler implements ConnectorService {
private final String connectorName;
private final String queueName;
private final int port;
private final String host;
private final int quorumSize;
private final String haGroup;
private final String vertxAddress;
private EventBus eventBus;
private PlatformManager platformManager;
private EventHandler handler;
private final StorageManager storageManager;
private final PostOffice postOffice;
private boolean isStarted = false;
IncomingVertxEventHandler(String connectorName,
Map<String, Object> configuration,
StorageManager storageManager,
PostOffice postOffice) {
this.connectorName = connectorName;
this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration);
this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, configuration);
this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost", configuration);
this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE, -1, configuration);
this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP, "activemq", configuration);
this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS, "org.apache.activemq", configuration);
this.storageManager = storageManager;
this.postOffice = postOffice;
}
@Override
public void start() throws Exception {
if (this.isStarted) {
return;
}
System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
if (quorumSize != -1) {
platformManager = PlatformLocator.factory.createPlatformManager(port, host, quorumSize, haGroup);
} else {
platformManager = PlatformLocator.factory.createPlatformManager(port, host);
}
eventBus = platformManager.vertx().eventBus();
Binding b = postOffice.getBinding(new SimpleString(queueName));
if (b == null) {
throw new Exception(connectorName + ": queue " + queueName + " not found");
}
handler = new EventHandler();
eventBus.registerHandler(vertxAddress, handler);
isStarted = true;
ActiveMQVertxLogger.LOGGER.debug(connectorName + ": started");
}
@Override
public void stop() throws Exception {
if (!isStarted) {
return;
}
eventBus.unregisterHandler(vertxAddress, handler);
platformManager.stop();
System.clearProperty("vertx.clusterManagerFactory");
isStarted = false;
ActiveMQVertxLogger.LOGGER.debug(connectorName + ": stopped");
}
@Override
public boolean isStarted() {
return isStarted;
}
@Override
public String getName() {
return connectorName;
}
private class EventHandler implements Handler<Message<?>> {
@Override
public void handle(Message<?> message) {
ServerMessage msg = new ServerMessageImpl(storageManager.generateID(), VertxConstants.INITIAL_MESSAGE_BUFFER_SIZE);
msg.setAddress(new SimpleString(queueName));
msg.setDurable(true);
msg.encodeMessageIDToBuffer();
String replyAddress = message.replyAddress();
if (replyAddress != null) {
msg.putStringProperty(VertxConstants.VERTX_MESSAGE_REPLYADDRESS, replyAddress);
}
// it'd be better that Message expose its type information
int type = getMessageType(message);
msg.putIntProperty(VertxConstants.VERTX_MESSAGE_TYPE, type);
manualEncodeVertxMessageBody(msg.getBodyBuffer(), message.body(), type);
try {
postOffice.route(msg, false);
} catch (Exception e) {
ActiveMQVertxLogger.LOGGER.error("failed to route msg " + msg, e);
}
}
private void manualEncodeVertxMessageBody(ActiveMQBuffer bodyBuffer, Object body, int type) {
switch (type) {
case VertxConstants.TYPE_BOOLEAN:
bodyBuffer.writeBoolean(((Boolean) body));
break;
case VertxConstants.TYPE_BUFFER:
Buffer buff = (Buffer) body;
int len = buff.length();
bodyBuffer.writeInt(len);
bodyBuffer.writeBytes(((Buffer) body).getBytes());
break;
case VertxConstants.TYPE_BYTEARRAY:
byte[] bytes = (byte[]) body;
bodyBuffer.writeInt(bytes.length);
bodyBuffer.writeBytes(bytes);
break;
case VertxConstants.TYPE_BYTE:
bodyBuffer.writeByte((byte) body);
break;
case VertxConstants.TYPE_CHARACTER:
bodyBuffer.writeChar((Character) body);
break;
case VertxConstants.TYPE_DOUBLE:
bodyBuffer.writeDouble((double) body);
break;
case VertxConstants.TYPE_FLOAT:
bodyBuffer.writeFloat((Float) body);
break;
case VertxConstants.TYPE_INT:
bodyBuffer.writeInt((Integer) body);
break;
case VertxConstants.TYPE_LONG:
bodyBuffer.writeLong((Long) body);
break;
case VertxConstants.TYPE_SHORT:
bodyBuffer.writeShort((Short) body);
break;
case VertxConstants.TYPE_STRING:
case VertxConstants.TYPE_PING:
bodyBuffer.writeString((String) body);
break;
case VertxConstants.TYPE_JSON_OBJECT:
bodyBuffer.writeString(((JsonObject) body).encode());
break;
case VertxConstants.TYPE_JSON_ARRAY:
bodyBuffer.writeString(((JsonArray) body).encode());
break;
case VertxConstants.TYPE_REPLY_FAILURE:
ReplyException except = (ReplyException) body;
bodyBuffer.writeInt(except.failureType().toInt());
bodyBuffer.writeInt(except.failureCode());
bodyBuffer.writeString(except.getMessage());
break;
default:
throw new IllegalArgumentException("Invalid body type: " + type);
}
}
private int getMessageType(Message<?> message) {
Object body = message.body();
if (message instanceof PingMessage) {
return VertxConstants.TYPE_PING;
} else if (body instanceof Buffer) {
return VertxConstants.TYPE_BUFFER;
} else if (body instanceof Boolean) {
return VertxConstants.TYPE_BOOLEAN;
} else if (body instanceof byte[]) {
return VertxConstants.TYPE_BYTEARRAY;
} else if (body instanceof Byte) {
return VertxConstants.TYPE_BYTE;
} else if (body instanceof Character) {
return VertxConstants.TYPE_CHARACTER;
} else if (body instanceof Double) {
return VertxConstants.TYPE_DOUBLE;
} else if (body instanceof Float) {
return VertxConstants.TYPE_FLOAT;
} else if (body instanceof Integer) {
return VertxConstants.TYPE_INT;
} else if (body instanceof Long) {
return VertxConstants.TYPE_LONG;
} else if (body instanceof Short) {
return VertxConstants.TYPE_SHORT;
} else if (body instanceof String) {
return VertxConstants.TYPE_STRING;
} else if (body instanceof JsonArray) {
return VertxConstants.TYPE_JSON_ARRAY;
} else if (body instanceof JsonObject) {
return VertxConstants.TYPE_JSON_OBJECT;
} else if (body instanceof ReplyException) {
return VertxConstants.TYPE_REPLY_FAILURE;
}
throw new IllegalArgumentException("Type not supported: " + message);
}
}
@Override
public String toString() {
return "[IncomingVertxEventHandler(" + connectorName + "), queueName: " + queueName + " host: " + host + " port: " + port + " vertxAddress: " + vertxAddress + "]";
}
}

View File

@ -1,290 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.integration.vertx;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ConnectorService;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.ReplyException;
import org.vertx.java.core.eventbus.ReplyFailure;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
class OutgoingVertxEventHandler implements Consumer, ConnectorService {
private final String connectorName;
private final String queueName;
private final int port;
private final String host;
private final int quorumSize;
private final String haGroup;
private final String vertxAddress;
private final boolean publish;
private final PostOffice postOffice;
private Queue queue = null;
private Filter filter = null;
private EventBus eventBus;
private PlatformManager platformManager;
private boolean isStarted = false;
OutgoingVertxEventHandler(String connectorName, Map<String, Object> configuration, PostOffice postOffice) {
this.connectorName = connectorName;
this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration);
this.postOffice = postOffice;
this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, configuration);
this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost", configuration);
this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE, -1, configuration);
this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP, "activemq", configuration);
this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS, "org.apache.activemq", configuration);
this.publish = ConfigurationHelper.getBooleanProperty(VertxConstants.VERTX_PUBLISH, false, configuration);
}
@Override
public void start() throws Exception {
if (this.isStarted) {
return;
}
System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
if (quorumSize != -1) {
platformManager = PlatformLocator.factory.createPlatformManager(port, host, quorumSize, haGroup);
} else {
platformManager = PlatformLocator.factory.createPlatformManager(port, host);
}
eventBus = platformManager.vertx().eventBus();
if (this.connectorName == null || this.connectorName.trim().equals("")) {
throw new Exception("invalid connector name: " + this.connectorName);
}
if (this.queueName == null || this.queueName.trim().equals("")) {
throw new Exception("invalid queue name: " + queueName);
}
SimpleString name = new SimpleString(this.queueName);
Binding b = this.postOffice.getBinding(name);
if (b == null) {
throw new Exception(connectorName + ": queue " + queueName + " not found");
}
this.queue = (Queue) b.getBindable();
this.queue.addConsumer(this);
this.queue.deliverAsync();
this.isStarted = true;
ActiveMQVertxLogger.LOGGER.debug(connectorName + ": started");
}
@Override
public void stop() throws Exception {
if (!this.isStarted) {
return;
}
ActiveMQVertxLogger.LOGGER.debug(connectorName + ": receive shutdown request");
this.queue.removeConsumer(this);
this.platformManager.stop();
System.clearProperty("vertx.clusterManagerFactory");
this.isStarted = false;
ActiveMQVertxLogger.LOGGER.debug(connectorName + ": stopped");
}
@Override
public boolean isStarted() {
return this.isStarted;
}
@Override
public String getName() {
return this.connectorName;
}
@Override
public HandleStatus handle(MessageReference ref) throws Exception {
if (filter != null && !filter.match(ref.getMessage())) {
return HandleStatus.NO_MATCH;
}
synchronized (this) {
ref.handled();
ServerMessage message = ref.getMessage();
Object vertxMsgBody;
// extract information from message
Integer type = message.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
if (type == null) {
// log a warning and default to raw bytes
ActiveMQVertxLogger.LOGGER.nonVertxMessage(message);
type = VertxConstants.TYPE_RAWBYTES;
}
// from vertx
vertxMsgBody = extractMessageBody(message, type);
if (vertxMsgBody == null) {
return HandleStatus.NO_MATCH;
}
// send to bus
if (!publish) {
eventBus.send(vertxAddress, vertxMsgBody);
} else {
eventBus.publish(vertxAddress, vertxMsgBody);
}
queue.acknowledge(ref);
ActiveMQVertxLogger.LOGGER.debug(connectorName + ": forwarded to vertx: " + message.getMessageID());
return HandleStatus.HANDLED;
}
}
private Object extractMessageBody(ServerMessage message, Integer type) throws Exception {
Object vertxMsgBody = null;
ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
switch (type) {
case VertxConstants.TYPE_PING:
case VertxConstants.TYPE_STRING:
bodyBuffer.resetReaderIndex();
vertxMsgBody = bodyBuffer.readString();
break;
case VertxConstants.TYPE_BUFFER:
int len = bodyBuffer.readInt();
byte[] bytes = new byte[len];
bodyBuffer.readBytes(bytes);
vertxMsgBody = new Buffer(bytes);
break;
case VertxConstants.TYPE_BOOLEAN:
vertxMsgBody = bodyBuffer.readBoolean();
break;
case VertxConstants.TYPE_BYTEARRAY:
int length = bodyBuffer.readInt();
byte[] byteArray = new byte[length];
bodyBuffer.readBytes(byteArray);
vertxMsgBody = byteArray;
break;
case VertxConstants.TYPE_BYTE:
vertxMsgBody = bodyBuffer.readByte();
break;
case VertxConstants.TYPE_CHARACTER:
vertxMsgBody = bodyBuffer.readChar();
break;
case VertxConstants.TYPE_DOUBLE:
vertxMsgBody = bodyBuffer.readDouble();
break;
case VertxConstants.TYPE_FLOAT:
vertxMsgBody = bodyBuffer.readFloat();
break;
case VertxConstants.TYPE_INT:
vertxMsgBody = bodyBuffer.readInt();
break;
case VertxConstants.TYPE_LONG:
vertxMsgBody = bodyBuffer.readLong();
break;
case VertxConstants.TYPE_SHORT:
vertxMsgBody = bodyBuffer.readShort();
break;
case VertxConstants.TYPE_JSON_OBJECT:
vertxMsgBody = new JsonObject(bodyBuffer.readString());
break;
case VertxConstants.TYPE_JSON_ARRAY:
vertxMsgBody = new JsonArray(bodyBuffer.readString());
break;
case VertxConstants.TYPE_REPLY_FAILURE:
int failureType = bodyBuffer.readInt();
int failureCode = bodyBuffer.readInt();
String errMsg = bodyBuffer.readString();
vertxMsgBody = new ReplyException(ReplyFailure.fromInt(failureType), failureCode, errMsg);
break;
case VertxConstants.TYPE_RAWBYTES:
int size = bodyBuffer.readableBytes();
byte[] rawBytes = new byte[size];
bodyBuffer.readBytes(rawBytes);
vertxMsgBody = rawBytes;
break;
default:
ActiveMQVertxLogger.LOGGER.invalidVertxType(type);
break;
}
return vertxMsgBody;
}
@Override
public void proceedDeliver(MessageReference reference) throws Exception {
// no op
}
@Override
public Filter getFilter() {
return this.filter;
}
@Override
public String debug() {
return null;
}
@Override
public String toManagementString() {
return null;
}
@Override
public List<MessageReference> getDeliveringMessages() {
return null;
}
@Override
public void disconnect() {
}
}

View File

@ -1,82 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.integration.vertx;
import java.util.HashSet;
import java.util.Set;
public class VertxConstants {
// org.vertx.java.core.eventbus.impl.MessageFactory
public static final int TYPE_PING = 0;
public static final int TYPE_BUFFER = 1;
public static final int TYPE_BOOLEAN = 2;
public static final int TYPE_BYTEARRAY = 3;
public static final int TYPE_BYTE = 4;
public static final int TYPE_CHARACTER = 5;
public static final int TYPE_DOUBLE = 6;
public static final int TYPE_FLOAT = 7;
public static final int TYPE_INT = 8;
public static final int TYPE_LONG = 9;
public static final int TYPE_SHORT = 10;
public static final int TYPE_STRING = 11;
public static final int TYPE_JSON_OBJECT = 12;
public static final int TYPE_JSON_ARRAY = 13;
public static final int TYPE_REPLY_FAILURE = 100;
public static final int TYPE_RAWBYTES = 200;
public static final String PORT = "port";
public static final String HOST = "host";
public static final String QUEUE_NAME = "queue";
public static final String VERTX_ADDRESS = "vertx-address";
public static final String VERTX_PUBLISH = "publish";
public static final String VERTX_QUORUM_SIZE = "quorum-size";
public static final String VERTX_HA_GROUP = "ha-group";
public static final Set<String> ALLOWABLE_INCOMING_CONNECTOR_KEYS;
public static final Set<String> REQUIRED_INCOMING_CONNECTOR_KEYS;
public static final Set<String> ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
public static final Set<String> REQUIRED_OUTGOING_CONNECTOR_KEYS;
public static final int INITIAL_MESSAGE_BUFFER_SIZE = 50;
public static final String VERTX_MESSAGE_REPLYADDRESS = "VertxMessageReplyAddress";
public static final String VERTX_MESSAGE_TYPE = "VertxMessageType";
static {
ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet<>();
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(PORT);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(HOST);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_ADDRESS);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_QUORUM_SIZE);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_HA_GROUP);
REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet<>();
REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet<>();
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(PORT);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(HOST);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_ADDRESS);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_PUBLISH);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_QUORUM_SIZE);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_HA_GROUP);
REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet<>();
REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
}
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.integration.vertx;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ConnectorService;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
public class VertxIncomingConnectorServiceFactory implements ConnectorServiceFactory {
@Override
public ConnectorService createConnectorService(String connectorName,
Map<String, Object> configuration,
StorageManager storageManager,
PostOffice postOffice,
ScheduledExecutorService scheduledThreadPool) {
return new IncomingVertxEventHandler(connectorName, configuration, storageManager, postOffice);
}
@Override
public Set<String> getAllowableProperties() {
return VertxConstants.ALLOWABLE_INCOMING_CONNECTOR_KEYS;
}
@Override
public Set<String> getRequiredProperties() {
return VertxConstants.REQUIRED_INCOMING_CONNECTOR_KEYS;
}
}

View File

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.integration.vertx;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ConnectorService;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
public class VertxOutgoingConnectorServiceFactory implements ConnectorServiceFactory {
@Override
public ConnectorService createConnectorService(String connectorName,
Map<String, Object> configuration,
StorageManager storageManager,
PostOffice postOffice,
ScheduledExecutorService scheduledThreadPool) {
return new OutgoingVertxEventHandler(connectorName, configuration, postOffice);
}
@Override
public Set<String> getAllowableProperties() {
return VertxConstants.ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
}
@Override
public Set<String> getRequiredProperties() {
return VertxConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS;
}
}

View File

@ -54,7 +54,6 @@
<module>artemis-server-osgi</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>artemis-distribution</module>
<module>artemis-tools</module>
<module>tests</module>
@ -736,7 +735,6 @@
<module>artemis-jdbc-store</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>tests</module>
</modules>
<properties>
@ -772,7 +770,6 @@
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>examples</module>
<module>tests</module>
<module>artemis-distribution</module>
@ -831,7 +828,6 @@
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>tests</module>
</modules>
<properties>
@ -874,7 +870,6 @@
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>tests</module>
</modules>
<properties>
@ -909,7 +904,6 @@
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>tests</module>
<module>examples</module>
</modules>

View File

@ -30,8 +30,6 @@
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
<karaf.version>4.0.6</karaf.version>
<pax.exam.version>4.9.1</pax.exam.version>
<vertx.version>2.1.6</vertx.version>
<vertx.testtools.version>2.0.3-final</vertx.testtools.version>
</properties>
<repositories>
@ -121,11 +119,6 @@
<artifactId>artemis-spring-integration</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-vertx-integration</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-journal</artifactId>
@ -248,37 +241,6 @@
<scope>test</scope>
</dependency>
<!--Vertx provided dependencies-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-platform</artifactId>
<version>${vertx.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-hazelcast</artifactId>
<version>${vertx.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>testtools</artifactId>
<version>${vertx.testtools.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.MQTTException;
@ -57,7 +58,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vertx.java.core.impl.ConcurrentHashSet;
/**
* QT

View File

@ -1,774 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.vertx;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.integration.vertx.VertxConstants;
import org.apache.activemq.artemis.integration.vertx.VertxIncomingConnectorServiceFactory;
import org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServiceFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.impl.BaseMessage;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
/**
* This class tests the basics of ActiveMQ
* vertx inte
* gration
*/
@Ignore
public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
private PlatformManager vertxManager;
private ActiveMQServer server;
private String host = "localhost";
private String port = "0";
private String incomingQueue1 = "vertxTestIncomingQueue1";
private String incomingVertxAddress1 = "org.apache.activemq.test.incoming1";
//outgoing using send
private String inOutQueue1 = "vertxTestInOutQueue1";
private String incomingVertxAddress2 = "org.apache.activemq.test.incoming2";
private String outgoingVertxAddress1 = "org.apache.activemq.test.outgoing1";
//outgoing using publish
private String inOutQueue2 = "vertxTestInOutQueue2";
private String incomingVertxAddress3 = "org.apache.activemq.test.incoming3";
private String outgoingVertxAddress2 = "org.apache.activemq.test.outgoing2";
// Vertx is changing the classLoader to null.. this will preserve the original classloader
private ClassLoader contextClassLoader;
//subclasses may override this method
//in order to get a server with different connector services
@Before
@Override
public void setUp() throws Exception {
contextClassLoader = Thread.currentThread().getContextClassLoader();
createVertxService();
super.setUp();
//all queues
CoreQueueConfiguration qc1 = createCoreQueueConfiguration(incomingQueue1);
CoreQueueConfiguration qc2 = createCoreQueueConfiguration(inOutQueue1);
CoreQueueConfiguration qc3 = createCoreQueueConfiguration(inOutQueue2);
//incoming
HashMap<String, Object> config1 = createIncomingConnectionConfig(incomingVertxAddress1, incomingQueue1);
ConnectorServiceConfiguration inconf1 = createIncomingConnectorServiceConfiguration(config1, "test-vertx-incoming-connector1");
//outgoing send style
HashMap<String, Object> config2 = createOutgoingConnectionConfig(inOutQueue1, incomingVertxAddress2);
ConnectorServiceConfiguration inconf2 = createIncomingConnectorServiceConfiguration(config2, "test-vertx-incoming-connector2");
HashMap<String, Object> config3 = createOutgoingConnectionConfig(inOutQueue1, outgoingVertxAddress1);
ConnectorServiceConfiguration outconf1 = createOutgoingConnectorServiceConfiguration(config3, "test-vertx-outgoing-connector1");
//outgoing publish style
HashMap<String, Object> config4 = createOutgoingConnectionConfig(inOutQueue2, incomingVertxAddress3);
ConnectorServiceConfiguration inconf3 = createIncomingConnectorServiceConfiguration(config4, "test-vertx-incoming-connector3");
HashMap<String, Object> config5 = createOutgoingConnectionConfig(inOutQueue2, outgoingVertxAddress2);
config5.put(VertxConstants.VERTX_PUBLISH, "true");
ConnectorServiceConfiguration outconf2 = createOutgoingConnectorServiceConfiguration(config5, "test-vertx-outgoing-connector2");
Configuration configuration = createDefaultInVMConfig().addQueueConfiguration(qc1).addQueueConfiguration(qc2).addQueueConfiguration(qc3).addConnectorServiceConfiguration(inconf1).addConnectorServiceConfiguration(inconf2).addConnectorServiceConfiguration(outconf1).addConnectorServiceConfiguration(inconf3).addConnectorServiceConfiguration(outconf2);
server = createServer(false, configuration);
server.start();
}
/**
* (vertx events) ===> (incomingQueue1) ===> (activemq consumer)
*/
@Test
public void testIncomingEvents() throws Exception {
Vertx vertx = vertxManager.vertx();
//send a string message
String greeting = "Hello World!";
vertx.eventBus().send(incomingVertxAddress1, greeting);
ClientMessage msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
System.out.println("==== received msg: " + msg);
int vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_STRING, vertxType);
String body = msg.getBodyBuffer().readString();
System.out.println("==== body: " + body);
assertEquals(greeting, body);
//send a Buffer message
final byte[] content = greeting.getBytes(StandardCharsets.UTF_8);
Buffer buffer = new Buffer(content);
vertx.eventBus().send(incomingVertxAddress1, buffer);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_BUFFER, vertxType);
ActiveMQBuffer activeMQBuffer = msg.getBodyBuffer();
int len = activeMQBuffer.readInt();
System.out.println("==== len is: " + len);
assertEquals(content.length, len);
byte[] bytes = new byte[len];
activeMQBuffer.readBytes(bytes);
//bytes must match
for (int i = 0; i < len; i++) {
assertEquals(content[i], bytes[i]);
}
//send a boolean
vertx.eventBus().send(incomingVertxAddress1, Boolean.TRUE);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_BOOLEAN, vertxType);
Boolean booleanValue = msg.getBodyBuffer().readBoolean();
assertEquals(Boolean.TRUE, booleanValue);
//send a byte array
vertx.eventBus().send(incomingVertxAddress1, content);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_BYTEARRAY, vertxType);
len = msg.getBodyBuffer().readInt();
byte[] recvBytes = new byte[len];
msg.getBodyBuffer().readBytes(recvBytes);
//bytes must match
for (int i = 0; i < len; i++) {
assertEquals(content[i], recvBytes[i]);
}
//send a byte
Byte aByte = (byte) 15;
vertx.eventBus().send(incomingVertxAddress1, aByte);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_BYTE, vertxType);
Byte recvByte = msg.getBodyBuffer().readByte();
assertEquals(aByte, recvByte);
//send a Character
Character aChar = 'a';
vertx.eventBus().send(incomingVertxAddress1, aChar);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_CHARACTER, vertxType);
Character recvChar = msg.getBodyBuffer().readChar();
assertEquals(aChar, recvChar);
//send a Double
Double aDouble = 1234.56d;
vertx.eventBus().send(incomingVertxAddress1, aDouble);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_DOUBLE, vertxType);
Double recvDouble = msg.getBodyBuffer().readDouble();
assertEquals(aDouble, recvDouble);
//send a Float
Float aFloat = 1234.56f;
vertx.eventBus().send(incomingVertxAddress1, aFloat);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_FLOAT, vertxType);
Float recvFloat = msg.getBodyBuffer().readFloat();
assertEquals(aFloat, recvFloat);
//send an Integer
Integer aInt = 1234;
vertx.eventBus().send(incomingVertxAddress1, aInt);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_INT, vertxType);
Integer recvInt = msg.getBodyBuffer().readInt();
assertEquals(aInt, recvInt);
//send a Long
Long aLong = 12345678L;
vertx.eventBus().send(incomingVertxAddress1, aLong);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_LONG, vertxType);
Long recvLong = msg.getBodyBuffer().readLong();
assertEquals(aLong, recvLong);
//send a Short
Short aShort = (short) 321;
vertx.eventBus().send(incomingVertxAddress1, aShort);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_SHORT, vertxType);
Short recvShort = msg.getBodyBuffer().readShort();
assertEquals(aShort, recvShort);
//send a JsonObject
String jsonObjectString = "{\n" +
"\"Image\": {\n" +
"\"Width\": 800,\n" +
"\"Height\": 600,\n" +
"\"Title\": \"View from 15th Floor\",\n" +
"\"Thumbnail\": {\n" +
"\"Url\": \"http://www.example.com/image/481989943\",\n" +
"\"Height\": 125,\n" +
"\"Width\": 100\n" +
"},\n" +
"\"IDs\": [116, 943, 234, 38793]\n" +
"}\n" +
"}";
JsonObject aJsonObj = new JsonObject(jsonObjectString);
vertx.eventBus().send(incomingVertxAddress1, aJsonObj);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_JSON_OBJECT, vertxType);
String recvJsonString = msg.getBodyBuffer().readString();
System.out.println("==== received json: " + recvJsonString);
assertEquals(aJsonObj, new JsonObject(recvJsonString));
//send a JsonArray
String jsonArrayString = "[\n" +
"{\n" +
"\"precision\": \"zip\",\n" +
"\"Latitude\": 37.7668,\n" +
"\"Longitude\": -122.3959,\n" +
"\"Address\": \"\",\n" +
"\"City\": \"SAN FRANCISCO\",\n" +
"\"State\": \"CA\",\n" +
"\"Zip\": \"94107\",\n" +
"\"Country\": \"US\"\n" +
"},\n" +
"{\n" +
"\"precision\": \"zip\",\n" +
"\"Latitude\": 37.371991,\n" +
"\"Longitude\": -122.026020,\n" +
"\"Address\": \"\",\n" +
"\"City\": \"SUNNYVALE\",\n" +
"\"State\": \"CA\",\n" +
"\"Zip\": \"94085\",\n" +
"\"Country\": \"US\"\n" +
"}\n" +
"]";
JsonArray aJsonArray = new JsonArray(jsonArrayString);
System.out.println("a json array string: " + aJsonArray);
vertx.eventBus().send(incomingVertxAddress1, aJsonArray);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_JSON_ARRAY, vertxType);
recvJsonString = msg.getBodyBuffer().readString();
System.out.println("==== received json: " + recvJsonString);
assertEquals(aJsonArray, new JsonArray(recvJsonString));
}
/**
* vertx events (incomingVertxAddress2)
* ===> (inOutQueue1)
* ===> (outgoing handler)
* ===> send to vertx (outgoingVertxAddress1)
*/
@Test
public void testOutgoingEvents() throws Exception {
Vertx vertx = vertxManager.vertx();
//regiser a handler to receive outgoing messages
VertxTestHandler handler = new VertxTestHandler();
vertx.eventBus().registerHandler(outgoingVertxAddress1, handler);
//send a string message
String greeting = "Hello World!";
vertx.eventBus().send(incomingVertxAddress2, greeting);
//check message in handler
handler.checkStringMessageReceived(greeting);
//send a Buffer message
final byte[] content = greeting.getBytes(StandardCharsets.UTF_8);
Buffer buffer = new Buffer(content);
vertx.eventBus().send(incomingVertxAddress2, buffer);
handler.checkBufferMessageReceived(buffer);
//send a boolean
Boolean boolValue = Boolean.TRUE;
vertx.eventBus().send(incomingVertxAddress2, boolValue);
handler.checkBooleanMessageReceived(boolValue);
byte[] byteArray = greeting.getBytes(StandardCharsets.UTF_8);
vertx.eventBus().send(incomingVertxAddress2, byteArray);
handler.checkByteArrayMessageReceived(byteArray);
//send a byte
Byte aByte = (byte) 15;
vertx.eventBus().send(incomingVertxAddress2, aByte);
handler.checkByteMessageReceived(aByte);
//send a Character
Character aChar = 'a';
vertx.eventBus().send(incomingVertxAddress2, aChar);
handler.checkCharacterMessageReceived(aChar);
//send a Double
Double aDouble = 1234.56d;
vertx.eventBus().send(incomingVertxAddress2, aDouble);
handler.checkDoubleMessageReceived(aDouble);
//send a Float
Float aFloat = 1234.56f;
vertx.eventBus().send(incomingVertxAddress2, aFloat);
handler.checkFloatMessageReceived(aFloat);
//send an Integer
Integer aInt = 1234;
vertx.eventBus().send(incomingVertxAddress2, aInt);
handler.checkIntegerMessageReceived(aInt);
//send a Long
Long aLong = 12345678L;
vertx.eventBus().send(incomingVertxAddress2, aLong);
handler.checkLongMessageReceived(aLong);
//send a Short
Short aShort = (short) 321;
vertx.eventBus().send(incomingVertxAddress2, aShort);
handler.checkShortMessageReceived(aShort);
//send a JsonObject
String jsonObjectString = "{\n" +
"\"Image\": {\n" +
"\"Width\": 800,\n" +
"\"Height\": 600,\n" +
"\"Title\": \"View from 15th Floor\",\n" +
"\"Thumbnail\": {\n" +
"\"Url\": \"http://www.example.com/image/481989943\",\n" +
"\"Height\": 125,\n" +
"\"Width\": 100\n" +
"},\n" +
"\"IDs\": [116, 943, 234, 38793]\n" +
"}\n" +
"}";
JsonObject aJsonObj = new JsonObject(jsonObjectString);
vertx.eventBus().send(incomingVertxAddress2, aJsonObj);
handler.checkJsonObjectMessageReceived(aJsonObj);
//send a JsonArray
String jsonArrayString = "[\n" +
"{\n" +
"\"precision\": \"zip\",\n" +
"\"Latitude\": 37.7668,\n" +
"\"Longitude\": -122.3959,\n" +
"\"Address\": \"\",\n" +
"\"City\": \"SAN FRANCISCO\",\n" +
"\"State\": \"CA\",\n" +
"\"Zip\": \"94107\",\n" +
"\"Country\": \"US\"\n" +
"},\n" +
"{\n" +
"\"precision\": \"zip\",\n" +
"\"Latitude\": 37.371991,\n" +
"\"Longitude\": -122.026020,\n" +
"\"Address\": \"\",\n" +
"\"City\": \"SUNNYVALE\",\n" +
"\"State\": \"CA\",\n" +
"\"Zip\": \"94085\",\n" +
"\"Country\": \"US\"\n" +
"}\n" +
"]";
JsonArray aJsonArray = new JsonArray(jsonArrayString);
vertx.eventBus().send(incomingVertxAddress2, aJsonArray);
handler.checkJsonArrayMessageReceived(aJsonArray);
}
/**
* vertx events (incomingVertxAddress3)
* ===> (inOutQueue2)
* ===> (outgoing handler)
* ===> public to vertx (outgoingVertxAddress2)
*/
@Test
public void testOutgoingEvents2() throws Exception {
Vertx vertx = vertxManager.vertx();
//regiser two handlers to receive outgoing messages
VertxTestHandler handler1 = new VertxTestHandler();
vertx.eventBus().registerHandler(outgoingVertxAddress2, handler1);
VertxTestHandler handler2 = new VertxTestHandler();
vertx.eventBus().registerHandler(outgoingVertxAddress2, handler2);
//send a string message
String greeting = "Hello World!";
vertx.eventBus().send(incomingVertxAddress3, greeting);
//check message in handler
handler1.checkStringMessageReceived(greeting);
handler2.checkStringMessageReceived(greeting);
//send a Buffer message
final byte[] content = greeting.getBytes(StandardCharsets.UTF_8);
Buffer buffer = new Buffer(content);
vertx.eventBus().send(incomingVertxAddress3, buffer);
handler1.checkBufferMessageReceived(buffer);
handler2.checkBufferMessageReceived(buffer);
//send a boolean
Boolean boolValue = Boolean.TRUE;
vertx.eventBus().send(incomingVertxAddress3, boolValue);
handler1.checkBooleanMessageReceived(boolValue);
handler2.checkBooleanMessageReceived(boolValue);
byte[] byteArray = greeting.getBytes(StandardCharsets.UTF_8);
vertx.eventBus().send(incomingVertxAddress3, byteArray);
handler1.checkByteArrayMessageReceived(byteArray);
handler2.checkByteArrayMessageReceived(byteArray);
//send a byte
Byte aByte = (byte) 15;
vertx.eventBus().send(incomingVertxAddress3, aByte);
handler1.checkByteMessageReceived(aByte);
handler2.checkByteMessageReceived(aByte);
//send a Character
Character aChar = 'a';
vertx.eventBus().send(incomingVertxAddress3, aChar);
handler1.checkCharacterMessageReceived(aChar);
handler2.checkCharacterMessageReceived(aChar);
//send a Double
Double aDouble = 1234.56d;
vertx.eventBus().send(incomingVertxAddress3, aDouble);
handler1.checkDoubleMessageReceived(aDouble);
handler2.checkDoubleMessageReceived(aDouble);
//send a Float
Float aFloat = 1234.56f;
vertx.eventBus().send(incomingVertxAddress3, aFloat);
handler1.checkFloatMessageReceived(aFloat);
handler2.checkFloatMessageReceived(aFloat);
//send an Integer
Integer aInt = 1234;
vertx.eventBus().send(incomingVertxAddress3, aInt);
handler1.checkIntegerMessageReceived(aInt);
handler2.checkIntegerMessageReceived(aInt);
//send a Long
Long aLong = 12345678L;
vertx.eventBus().send(incomingVertxAddress3, aLong);
handler1.checkLongMessageReceived(aLong);
handler2.checkLongMessageReceived(aLong);
//send a Short
Short aShort = (short) 321;
vertx.eventBus().send(incomingVertxAddress3, aShort);
handler1.checkShortMessageReceived(aShort);
handler2.checkShortMessageReceived(aShort);
//send a JsonObject
String jsonObjectString = "{\n" +
"\"Image\": {\n" +
"\"Width\": 800,\n" +
"\"Height\": 600,\n" +
"\"Title\": \"View from 15th Floor\",\n" +
"\"Thumbnail\": {\n" +
"\"Url\": \"http://www.example.com/image/481989943\",\n" +
"\"Height\": 125,\n" +
"\"Width\": 100\n" +
"},\n" +
"\"IDs\": [116, 943, 234, 38793]\n" +
"}\n" +
"}";
JsonObject aJsonObj = new JsonObject(jsonObjectString);
vertx.eventBus().send(incomingVertxAddress3, aJsonObj);
handler1.checkJsonObjectMessageReceived(aJsonObj);
handler2.checkJsonObjectMessageReceived(aJsonObj);
//send a JsonArray
String jsonArrayString = "[\n" +
"{\n" +
"\"precision\": \"zip\",\n" +
"\"Latitude\": 37.7668,\n" +
"\"Longitude\": -122.3959,\n" +
"\"Address\": \"\",\n" +
"\"City\": \"SAN FRANCISCO\",\n" +
"\"State\": \"CA\",\n" +
"\"Zip\": \"94107\",\n" +
"\"Country\": \"US\"\n" +
"},\n" +
"{\n" +
"\"precision\": \"zip\",\n" +
"\"Latitude\": 37.371991,\n" +
"\"Longitude\": -122.026020,\n" +
"\"Address\": \"\",\n" +
"\"City\": \"SUNNYVALE\",\n" +
"\"State\": \"CA\",\n" +
"\"Zip\": \"94085\",\n" +
"\"Country\": \"US\"\n" +
"}\n" +
"]";
JsonArray aJsonArray = new JsonArray(jsonArrayString);
vertx.eventBus().send(incomingVertxAddress3, aJsonArray);
handler1.checkJsonArrayMessageReceived(aJsonArray);
handler2.checkJsonArrayMessageReceived(aJsonArray);
}
private ClientMessage receiveFromQueue(String queueName) throws Exception {
ClientMessage msg = null;
try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory sf = createSessionFactory(locator); ClientSession session = sf.createSession(false, true, true)) {
ClientConsumer consumer = session.createConsumer(queueName);
session.start();
msg = consumer.receive(60 * 1000);
msg.acknowledge();
session.commit();
}
return msg;
}
private void createVertxService() {
System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
vertxManager = PlatformLocator.factory.createPlatformManager(Integer.valueOf(port), host);
}
private class VertxTestHandler implements Handler<BaseMessage<?>> {
private volatile BaseMessage<?> vertxMsg = null;
private final Object lock = new Object();
@Override
public void handle(BaseMessage<?> arg0) {
synchronized (lock) {
vertxMsg = arg0;
lock.notify();
}
}
void checkJsonArrayMessageReceived(JsonArray aJsonArray) {
BaseMessage<?> msg = waitMessage();
JsonArray body = (JsonArray) msg.body();
assertEquals(aJsonArray, body);
}
void checkJsonObjectMessageReceived(final JsonObject aJsonObj) {
BaseMessage<?> msg = waitMessage();
JsonObject body = (JsonObject) msg.body();
assertEquals(aJsonObj, body);
}
void checkShortMessageReceived(final Short aShort) {
BaseMessage<?> msg = waitMessage();
Short body = (Short) msg.body();
assertEquals(aShort, body);
}
void checkLongMessageReceived(final Long aLong) {
BaseMessage<?> msg = waitMessage();
Long body = (Long) msg.body();
assertEquals(aLong, body);
}
void checkIntegerMessageReceived(final Integer aInt) {
BaseMessage<?> msg = waitMessage();
Integer body = (Integer) msg.body();
assertEquals(aInt, body);
}
void checkFloatMessageReceived(final Float aFloat) {
BaseMessage<?> msg = waitMessage();
Float body = (Float) msg.body();
assertEquals(aFloat, body);
}
void checkDoubleMessageReceived(final Double aDouble) {
BaseMessage<?> msg = waitMessage();
Double body = (Double) msg.body();
assertEquals(aDouble, body);
}
void checkCharacterMessageReceived(final Character aChar) {
BaseMessage<?> msg = waitMessage();
Character body = (Character) msg.body();
assertEquals(aChar, body);
}
void checkByteMessageReceived(final Byte aByte) {
BaseMessage<?> msg = waitMessage();
Byte body = (Byte) msg.body();
assertEquals(aByte, body);
}
void checkByteArrayMessageReceived(final byte[] byteArray) {
BaseMessage<?> msg = waitMessage();
byte[] body = (byte[]) msg.body();
assertEquals(byteArray.length, body.length);
for (int i = 0; i < byteArray.length; i++) {
assertEquals(byteArray[i], body[i]);
}
}
void checkBooleanMessageReceived(final Boolean boolValue) {
BaseMessage<?> msg = waitMessage();
Boolean body = (Boolean) msg.body();
assertEquals(boolValue, body);
}
void checkStringMessageReceived(final String str) {
BaseMessage<?> msg = waitMessage();
String body = (String) msg.body();
assertEquals(str, body);
}
void checkBufferMessageReceived(final Buffer buffer) {
byte[] source = buffer.getBytes();
BaseMessage<?> msg = waitMessage();
Buffer body = (Buffer) msg.body();
byte[] bytes = body.getBytes();
assertEquals(source.length, bytes.length);
for (int i = 0; i < bytes.length; i++) {
assertEquals(source[i], bytes[i]);
}
}
private BaseMessage<?> waitMessage() {
BaseMessage<?> msg = null;
synchronized (lock) {
long timeout = System.currentTimeMillis() + 10000;
while (vertxMsg == null && timeout > System.currentTimeMillis()) {
try {
lock.wait(1000);
} catch (InterruptedException e) {
}
}
msg = vertxMsg;
vertxMsg = null;
}
assertNotNull("Message didn't arrive after 10 seconds.", msg);
return msg;
}
}
@After
@Override
public void tearDown() throws Exception {
vertxManager.stop();
server.stop();
server = null;
// Something on vertx is setting the TCL to null what would break subsequent tests
Thread.currentThread().setContextClassLoader(contextClassLoader);
super.tearDown();
}
private CoreQueueConfiguration createCoreQueueConfiguration(String queueName) {
return new CoreQueueConfiguration().setAddress(queueName).setName(queueName);
}
private ConnectorServiceConfiguration createOutgoingConnectorServiceConfiguration(HashMap<String, Object> config,
String name) {
return new ConnectorServiceConfiguration().setFactoryClassName(VertxOutgoingConnectorServiceFactory.class.getName()).setParams(config).setName(name);
}
private ConnectorServiceConfiguration createIncomingConnectorServiceConfiguration(HashMap<String, Object> config,
String name) {
return new ConnectorServiceConfiguration().setFactoryClassName(VertxIncomingConnectorServiceFactory.class.getName()).setParams(config).setName(name);
}
private HashMap<String, Object> createIncomingConnectionConfig(String vertxAddress, String incomingQueue) {
HashMap<String, Object> config1 = new HashMap<>();
config1.put(VertxConstants.HOST, host);
config1.put(VertxConstants.PORT, port);
config1.put(VertxConstants.VERTX_ADDRESS, vertxAddress);
config1.put(VertxConstants.QUEUE_NAME, incomingQueue);
return config1;
}
private HashMap<String, Object> createOutgoingConnectionConfig(String queueName, String vertxAddress) {
HashMap<String, Object> config1 = new HashMap<>();
config1.put(VertxConstants.HOST, host);
config1.put(VertxConstants.PORT, port);
config1.put(VertxConstants.QUEUE_NAME, queueName);
config1.put(VertxConstants.VERTX_ADDRESS, vertxAddress);
return config1;
}
}