This commit is contained in:
Martyn Taylor 2015-09-11 15:05:19 +01:00
commit 243a94176d
19 changed files with 178 additions and 598 deletions

View File

@ -7,6 +7,12 @@ This file describes some minimum 'stuff one needs to know' to get started coding
For details about the modifying the code, building the project, running tests, IDE integration, etc. see
our [Hacking Guide](./docs/hacking-guide/en/SUMMARY.md).
## Documentation
Our documentation is always in sync with our releases at the [Apache ActiveMQ Artemis](http://activemq.apache.org/artemis/docs.html) website.
Or you can also look at the current master version on [github](https://github.com/apache/activemq-artemis/blob/master/docs/user-manual/en/SUMMARY.md).
## Examples
To run an example firstly make sure you have run

View File

@ -561,13 +561,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
clusterTransportConfiguration = locator.clusterTransportConfiguration;
}
private synchronized TransportConfiguration selectConnector() {
private TransportConfiguration selectConnector() {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
synchronized (topologyArrayGuard) {
usedTopology = topologyArray;
}
synchronized (this) {
// if the topologyArray is null, we will use the initialConnectors
if (usedTopology != null) {
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
@ -589,6 +590,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return initialConnectors[pos];
}
}
}
public void start(Executor executor) throws Exception {
initialise();
@ -637,17 +639,24 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws ActiveMQException {
ClientSessionFactoryInternal returnFactory = null;
synchronized (this) {
// static list of initial connectors
if (getNumInitialConnectors() > 0 && discoveryGroup == null) {
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings);
addFactory(sf);
return sf;
returnFactory = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings);
}
}
if (returnFactory != null) {
addFactory(returnFactory);
return returnFactory;
}
else {
// wait for discovery group to get the list of initial connectors
return (ClientSessionFactoryInternal) createSessionFactory();
}
}
@Override
public ClientSessionFactoryInternal connectNoWarnings() throws ActiveMQException {
@ -844,12 +853,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
factory.cleanup();
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
}
}
addFactory(factory);
return factory;
}
}
public boolean isHA() {
return ha;
@ -1494,10 +1503,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
public void factoryClosed(final ClientSessionFactory factory) {
boolean isEmpty;
synchronized (factories) {
factories.remove(factory);
isEmpty = factories.isEmpty();
}
if (!clusterConnection && factories.isEmpty()) {
if (!clusterConnection && isEmpty) {
// Go back to using the broadcast or static list
synchronized (topologyArrayGuard) {
receivedTopology = false;
@ -1506,7 +1518,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
}
}
public Topology getTopology() {
return topology;

View File

@ -88,8 +88,8 @@
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<version>0.24</version>
<artifactId>qpid-jms-client</artifactId>
<version>0.5.0</version>
<scope>test</scope>
</dependency>

View File

@ -21,24 +21,22 @@ import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import java.lang.ref.WeakReference;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.proton.plug.test.minimalserver.DumbServer;
import org.proton.plug.test.minimalserver.MinimalServer;
public class AbstractJMSTest {
protected final boolean useHawtJMS;
protected final boolean useSASL;
protected String address = "exampleQueue";
protected MinimalServer server = new MinimalServer();
public AbstractJMSTest(boolean useHawtJMS, boolean useSASL) {
this.useHawtJMS = useHawtJMS;
public AbstractJMSTest(boolean useSASL) {
this.useSASL = useSASL;
}
@ -77,34 +75,16 @@ public class AbstractJMSTest {
protected ConnectionFactory createConnectionFactory() {
if (useSASL) {
if (useHawtJMS) {
// return new JmsConnectionFactory("aaaaaaaa", "aaaaaaa", "amqp://localhost:" + Constants.PORT);
return null;
return new JmsConnectionFactory("aaaaaaaa", "aaaaaaa", "amqp://localhost:5672");
}
else {
return new ConnectionFactoryImpl("localhost", Constants.PORT, "aaaaaaaa", "aaaaaaa");
}
}
else {
if (useHawtJMS) {
// return new JmsConnectionFactory("amqp://localhost:" + Constants.PORT);
return null;
}
else {
return new ConnectionFactoryImpl("localhost", Constants.PORT, null, null);
}
return new JmsConnectionFactory( "amqp://localhost:5672");
}
}
protected Queue createQueue() {
if (useHawtJMS) {
// return new JmsQueue(address);
return null;
}
else {
return new QueueImpl(address);
}
protected Queue createQueue(Session session) throws Exception {
return session.createQueue(address);
}
}

View File

@ -32,7 +32,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.Message;
@ -59,16 +58,15 @@ public class ProtonTest extends AbstractJMSTest {
protected Connection connection;
@Parameterized.Parameters(name = "useHawt={0} sasl={1}")
@Parameterized.Parameters(name = "sasl={0}")
public static Collection<Object[]> data() {
List<Object[]> list = Arrays.asList(new Object[][]{{Boolean.FALSE, Boolean.TRUE}, {Boolean.FALSE, Boolean.FALSE}});
List<Object[]> list = Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
System.out.println("Size = " + list.size());
return list;
}
public ProtonTest(boolean useHawtJMS, boolean useSASL) {
super(useHawtJMS, useSASL);
public ProtonTest(boolean useSASL) {
super(useSASL);
}
@Before
@ -92,7 +90,6 @@ public class ProtonTest extends AbstractJMSTest {
public void testMessagesReceivedInParallel() throws Throwable {
final int numMessages = getNumberOfMessages();
long time = System.currentTimeMillis();
final Queue queue = createQueue();
final ArrayList<Throwable> exceptions = new ArrayList<>();
@ -105,6 +102,7 @@ public class ProtonTest extends AbstractJMSTest {
// connectionConsumer = connection;
connectionConsumer.start();
Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = createQueue(sessionConsumer);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
int count = numMessages;
@ -143,6 +141,7 @@ public class ProtonTest extends AbstractJMSTest {
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
t.start();
final Queue queue = createQueue(session);
MessageProducer p = session.createProducer(queue);
p.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -156,14 +155,14 @@ public class ProtonTest extends AbstractJMSTest {
}
long taken = (System.currentTimeMillis() - time);
System.out.println("taken on send = " + taken + " usehawt = " + useHawtJMS + " sasl = " + useSASL);
System.out.println("taken on send = " + taken + " sasl = " + useSASL);
t.join();
for (Throwable e : exceptions) {
throw e;
}
taken = (System.currentTimeMillis() - time);
System.out.println("taken = " + taken + " usehawt = " + useHawtJMS + " sasl = " + useSASL);
System.out.println("taken = " + taken + " sasl = " + useSASL);
connection.close();
// assertEquals(0, q.getMessageCount());
@ -171,9 +170,9 @@ public class ProtonTest extends AbstractJMSTest {
@Test
public void testSimpleCreateSessionAndClose() throws Throwable {
final QueueImpl queue = new QueueImpl(address);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(address);
Thread.sleep(1000);
session.close();
connection.close();
@ -183,10 +182,10 @@ public class ProtonTest extends AbstractJMSTest {
public void testSimpleBinary() throws Throwable {
final int numMessages = 5;
long time = System.currentTimeMillis();
final QueueImpl queue = new QueueImpl(address);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = createQueue(session);
byte[] bytes = new byte[0xf + 1];
for (int i = 0; i <= 0xf; i++) {
bytes[i] = (byte) i;
@ -230,8 +229,8 @@ public class ProtonTest extends AbstractJMSTest {
@Test
public void testMapMessage() throws Exception {
Queue queue = createQueue();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = createQueue(session);
MessageProducer p = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
MapMessage message = session.createMapMessage();
@ -252,8 +251,8 @@ public class ProtonTest extends AbstractJMSTest {
@Test
public void testProperties() throws Exception {
Queue queue = createQueue();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = createQueue(session);
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("msg:0");
@ -310,7 +309,7 @@ public class ProtonTest extends AbstractJMSTest {
Session clientSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
MessageConsumer consumer = clientSession.createConsumer(createQueue());
MessageConsumer consumer = clientSession.createConsumer(createQueue(clientSession));
for (int i = 0; i < 1; i++) {
MapMessage msg = (MapMessage) consumer.receive(5000);
System.out.println("Msg " + msg);

View File

@ -354,29 +354,3 @@ straightforward. Simply set the parameter `persistence-enabled` in
Please note that if you set this parameter to false, then *zero*
persistence will occur. That means no bindings data, message data, large
message data, duplicate id caches or paging data will be persisted.
## Import/Export the Journal Data
You may want to inspect the existent records on each one of the journals
used by Apache ActiveMQ Artemis, and you can use the export/import tool for that
purpose.
you can export the journal as a text file by using this command:
`java -cp activemq-tools-jar-with-dependencies.jar export-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>`
To import the file as binary data on the journal (Notice you also
require netty.jar):
`java -cp activemq-tools-jar-with-dependencies.jar import-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileInput>`
- JournalDirectory: Use the configured folder for your selected folder. Example: ./activemq/data/journal
- JournalPrefix: Use the prefix for your selected journal, as discussed above
- FileExtension: Use the extension for your selected journal, as discussed above
- FileSize: Use the size for your selected journal, as discussed above
- FileOutput or FileInput: text file that will contain the exported data
See [Tools](tools.md) for more information.

View File

@ -48,7 +48,7 @@ under the License.
<id>release</id>
<modules>
<module>proton-cpp</module>
<module>proton-j</module>
<module>queue</module>
<module>proton-ruby</module>
</modules>
</profile>
@ -58,7 +58,7 @@ under the License.
<!-- this one to be run manually
<module>proton-cpp</module> -->
<module>proton-j</module>
<module>queue</module>
<module>proton-ruby</module>
</modules>
</profile>

View File

@ -1,17 +0,0 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
guest=guest

View File

@ -1,17 +0,0 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
guest=guest

View File

@ -1,68 +0,0 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:activemq"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<jms xmlns="urn:activemq:jms">
<!--the queue used by the example-->
<queue name="exampleQueue"/>
</jms>
<core xmlns="urn:activemq:core">
<bindings-directory>${data.dir}/server0/data/messaging/bindings</bindings-directory>
<journal-directory>${data.dir}/server0/data/messaging/journal</journal-directory>
<large-messages-directory>${data.dir}/server0/data/messaging/largemessages</large-messages-directory>
<paging-directory>${data.dir}/server0/data/messaging/paging</paging-directory>
<!-- Acceptors -->
<acceptors>
<acceptor name="proton-acceptor">tcp://localhost:5672</acceptor>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
</acceptors>
<queues>
<queue name="testQueue">
<address>testQueue</address>
</queue>
</queues>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.exampleQueue">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</core>
</configuration>

View File

@ -27,7 +27,7 @@ under the License.
<version>1.1.1-SNAPSHOT</version>
</parent>
<artifactId>artemis-proton-j</artifactId>
<artifactId>queue</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis Proton-J Example</name>
@ -38,8 +38,8 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client</artifactId>
<version>0.22</version>
<artifactId>qpid-jms-client</artifactId>
<version>0.5.0</version>
</dependency>
</dependencies>
@ -97,7 +97,7 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.amqp</groupId>
<artifactId>artemis-proton-j</artifactId>
<artifactId>queue</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

View File

@ -42,54 +42,5 @@ under the License.
&lt;acceptor name="proton-acceptor"&gt;tcp://localhost:5672&lt;/acceptor&gt;
</code>
</pre>
<h2>Example step-by-step</h2>
<ol>
<li> Create an amqp qpid 1.0 connection.</li>
<pre class="prettyprint">
<code>connection= new Connection("localhost", 5672, null, null);</code>
</pre>
<li>Create a session</li>
<pre class="prettyprint">
<code>Session session = connection.createSession();</code>
</pre>
<li>Create a sender</li>
<pre class="prettyprint">
<code>Sender sender = session.createSender("testQueue");</code>
</pre>
<li>send a simple message</li>
<pre class="prettyprint">
<code>sender.send(new Message("I am an amqp message"));</code>
</pre>
<li>create a moving receiver, this means the message will be removed from the queue</li>
<pre class="prettyprint">
<code>Receiver rec = session.createMovingReceiver("testQueue");</code>
</pre>
<li>set some credit so we can receive</li>
<pre class="prettyprint">
<code>rec.setCredit(UnsignedInteger.valueOf(1), false);</code>
</pre>
<li>receive the simple message</li>
<pre class="prettyprint">
<code>Message m = rec.receive(5000);
System.out.println("message = " + m.getPayload());</code>
</pre>
<li>acknowledge the message</li>
<pre class="prettyprint">
<code>rec.acknowledge(m);</code>
</pre>
<li>close the connection</li>
<pre class="prettyprint">
<code>connection.close();</code>
</pre>
</ol>
</body>
</html>

View File

@ -16,43 +16,46 @@
*/
package org.apache.activemq.artemis.jms.example;
import org.apache.qpid.amqp_1_0.client.Connection;
import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.client.Session;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.*;
import org.apache.qpid.jms.JmsConnectionFactory;
public class ProtonJExample {
public static void main(String[] args) throws Exception {
Connection connection = null;
ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672");
try {
// Step 1. Create an amqp qpid 1.0 connection
connection = new Connection("localhost", 5672, null, null);
connection = connectionFactory.createConnection();
// Step 2. Create a session
Session session = connection.createSession();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 3. Create a sender
Sender sender = session.createSender("jms.queue.exampleQueue");
Queue queue = session.createQueue("jms.queue.exampleQueue");
MessageProducer sender = session.createProducer(queue);
// Step 4. send a simple message
sender.send(new Message("I am an amqp message"));
// Step 4. send a few simple message
sender.send(session.createTextMessage("Hello world "));
connection.start();
// Step 5. create a moving receiver, this means the message will be removed from the queue
Receiver rec = session.createMovingReceiver("jms.queue.exampleQueue");
MessageConsumer consumer = session.createConsumer(queue);
// Step 6. set some credit so we can receive
rec.setCredit(UnsignedInteger.valueOf(1), false);
// Step 7. receive the simple message
Message m = rec.receive(5000);
System.out.println("message = " + m.getPayload());
TextMessage m = (TextMessage) consumer.receive(5000);
System.out.println("message = " + m.getText());
// Step 8. acknowledge the message
rec.acknowledge(m);
}
finally {
if (connection != null) {

67
pom.xml
View File

@ -33,6 +33,26 @@
<module>artemis-protocols</module>
<module>artemis-dto</module>
<module>artemis-boot</module>
<module>artemis-web</module>
<module>artemis-website</module>
<module>artemis-cli</module>
<module>artemis-commons</module>
<module>artemis-selector</module>
<module>artemis-core-client</module>
<module>artemis-server</module>
<module>artemis-jms-client</module>
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>artemis-distribution</module>
<module>tests</module>
</modules>
<name>ActiveMQ Artemis Parent</name>
@ -482,6 +502,23 @@
<javac-compiler-id>javac</javac-compiler-id>
</properties>
</profile>
<profile>
<id>jdk18</id>
<activation>
<jdk>1.8</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<additionalparam>-Xdoclint:none</additionalparam>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>dev</id>
<modules>
@ -517,29 +554,6 @@
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<modules>
<module>artemis-dto</module>
<module>artemis-web</module>
<module>artemis-website</module>
<module>artemis-cli</module>
<module>artemis-commons</module>
<module>artemis-selector</module>
<module>artemis-core-client</module>
<module>artemis-server</module>
<module>artemis-jms-client</module>
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>artemis-distribution</module>
<module>tests</module>
</modules>
</profile>
<profile>
<id>release</id>
@ -594,13 +608,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<additionalparam>-Xdoclint:none</additionalparam>
</configuration>
</plugin>
</plugins>
</build>
</profile>

View File

@ -37,7 +37,6 @@
<org-apache-derby-version>10.11.1.1</org-apache-derby-version>
<commons-io-version>2.4</commons-io-version>
<commons-net-version>3.3</commons-net-version>
<qpid-jms-version>0.30</qpid-jms-version>
<xbean-version>3.18</xbean-version>
<hamcrest-version>1.3</hamcrest-version>
<slf4j-version>1.7.10</slf4j-version>
@ -195,13 +194,6 @@
<version>${commons-net-version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<version>${qpid-jms-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>

View File

@ -1,119 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.conversions;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
/**
*/
public class AmqpAndMqttTest extends CombinationTestSupport {
protected BrokerService broker;
private TransportConnector amqpConnector;
private TransportConnector mqttConnector;
@Override
protected void setUp() throws Exception {
super.setUp();
broker = createBroker();
broker.start();
broker.waitUntilStarted();
}
@Override
protected void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
broker = null;
}
super.tearDown();
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
mqttConnector = broker.addConnector("mqtt://0.0.0.0:0");
return broker;
}
public void testFromMqttToAmqp() throws Exception {
Connection amqp = createAmqpConnection();
Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic("topic://FOO"));
final BlockingConnection mqtt = createMQTTConnection().blockingConnection();
mqtt.connect();
byte[] payload = bytes("Hello World");
mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false);
mqtt.disconnect();
Message msg = consumer.receive(1000 * 5);
assertNotNull(msg);
assertTrue(msg instanceof BytesMessage);
BytesMessage bmsg = (BytesMessage) msg;
byte[] actual = new byte[(int) bmsg.getBodyLength()];
bmsg.readBytes(actual);
assertTrue(Arrays.equals(actual, payload));
amqp.close();
}
private byte[] bytes(String value) {
try {
return value.getBytes("UTF-8");
}
catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
mqtt.setReconnectAttemptsMax(0);
mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
return mqtt;
}
public Connection createAmqpConnection() throws Exception {
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", amqpConnector.getConnectUri().getPort(), "admin", "password");
final Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
}
}

View File

@ -187,8 +187,8 @@
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<version>0.24</version>
<artifactId>qpid-jms-client</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>

View File

@ -23,8 +23,8 @@ import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
import org.junit.Test;
public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
@ -33,7 +33,7 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
public void testMultipleOpen() throws Exception {
cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1)));
final int numberOfOpens = 2000;
final int numberOfOpens = 500;
int numberOfThreads = 20;
// I want all the threads aligned, just ready to start creating connections like in a car race
final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads);
@ -41,6 +41,10 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
class ThreadOpen extends Thread {
ThreadOpen(int i) {
super("MultipleThreadsOpeningTest/ThreadOpen::" + i);
}
int errors = 0;
public void run() {
@ -50,8 +54,8 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
flagStartRace.await();
for (int i = 0; i < numberOfOpens; i++) {
if (i % 1000 == 0)
System.out.println("tests " + i);
if (i % 100 == 0)
System.out.println("connections created on Thread " + Thread.currentThread() + " " + i);
Connection conn = cf1.createConnection();
Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
sess.close();
@ -68,18 +72,27 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
ThreadOpen[] threads = new ThreadOpen[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
threads[i] = new ThreadOpen();
threads[i] = new ThreadOpen(i);
threads[i].start();
}
flagAlignSemaphore.await();
flagStartRace.countDown();
try {
for (ThreadOpen t : threads) {
// 5 minutes seems long but this may take a bit of time in a slower box
t.join(300000);
t.join(60000);
assertFalse(t.isAlive());
assertEquals("There are Errors on the test thread", 0, t.errors);
}
}
finally {
for (ThreadOpen t : threads) {
if (t.isAlive()) {
t.interrupt();
}
t.join(1000);
}
}
}
}

View File

@ -40,22 +40,15 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.Random;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -138,6 +131,7 @@ public class ProtonTest extends ActiveMQTestBase {
@After
public void tearDown() throws Exception {
try {
Thread.sleep(250);
if (connection != null) {
connection.close();
}
@ -264,6 +258,10 @@ public class ProtonTest extends ActiveMQTestBase {
cons.close();
for (int i = 0; i < 100 && serverQueue.getConsumerCount() != 0; i++) {
Thread.sleep(500);
}
assertEquals(0, serverQueue.getConsumerCount());
session.close();
@ -337,7 +335,7 @@ public class ProtonTest extends ActiveMQTestBase {
connection = createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
Thread.sleep(1000);
Thread.sleep(100);
consumer.close();
connection.close();
Assert.assertEquals(numMessages, getMessageCount(q));
@ -737,153 +735,20 @@ public class ProtonTest extends ActiveMQTestBase {
connection.close();
}
@Test
public void testUsingPlainAMQP() throws Exception {
if (this.protocol != 0 && protocol != 3) {
return;
}
org.apache.qpid.amqp_1_0.client.Connection connection = null;
private javax.jms.Queue createQueue(String address) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
// Step 1. Create an amqp qpid 1.0 connection
connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null);
// Step 2. Create a session
org.apache.qpid.amqp_1_0.client.Session session = connection.createSession();
// Step 3. Create a sender
Sender sender = session.createSender("jms.queue.exampleQueue");
// Step 4. send a simple message
sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message"));
// Step 5. create a moving receiver, this means the message will be removed from the queue
Receiver rec = session.createMovingReceiver("jms.queue.exampleQueue");
// Step 6. set some credit so we can receive
rec.setCredit(UnsignedInteger.valueOf(1), false);
// Step 7. receive the simple message
org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000);
System.out.println("message = " + m.getPayload());
// Step 8. acknowledge the message
rec.acknowledge(m);
return session.createQueue(address);
}
finally {
if (connection != null) {
// Step 9. close the connection
connection.close();
}
}
}
@Test
public void testUsingPlainAMQPSenderWithNonExistentQueue() throws Exception {
if (this.protocol != 0 && protocol != 3) {
return;
}
String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString();
org.apache.qpid.amqp_1_0.client.Connection connection = null;
try {
// Step 1. Create an amqp qpid 1.0 connection
connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null);
// Step 2. Create a session
org.apache.qpid.amqp_1_0.client.Session session = connection.createSession();
// Step 3. Create a sender
Sender sender = session.createSender(queue);
Assert.assertNotNull(server.locateQueue(new SimpleString(queue)));
// Step 4. send a simple message
sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message"));
// Step 5. create a moving receiver, this means the message will be removed from the queue
Receiver rec = session.createMovingReceiver(queue);
// Step 6. set some credit so we can receive
rec.setCredit(UnsignedInteger.valueOf(1), false);
// Step 7. receive the simple message
org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000);
System.out.println("message = " + m.getPayload());
// Step 8. acknowledge the message
rec.acknowledge(m);
}
finally {
if (connection != null) {
// Step 9. close the connection
connection.close();
}
}
}
@Test
public void testUsingPlainAMQPReceiverWithNonExistentQueue() throws Exception {
if (this.protocol != 0 && protocol != 3) {
return;
}
String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString();
org.apache.qpid.amqp_1_0.client.Connection connection = null;
try {
// Step 1. Create an amqp qpid 1.0 connection
connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null);
// Step 2. Create a session
org.apache.qpid.amqp_1_0.client.Session session = connection.createSession();
// Step 3. create a moving receiver, this means the message will be removed from the queue
Receiver rec = session.createMovingReceiver(queue);
Assert.assertNotNull(server.locateQueue(new SimpleString(queue)));
// Step 4. Create a sender
Sender sender = session.createSender(queue);
// Step 5. send a simple message
sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message"));
// Step 6. set some credit so we can receive
rec.setCredit(UnsignedInteger.valueOf(1), false);
// Step 7. receive the simple message
org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000);
System.out.println("message = " + m.getPayload());
// Step 8. acknowledge the message
rec.acknowledge(m);
}
finally {
if (connection != null) {
// Step 9. close the connection
connection.close();
}
}
}
private javax.jms.Queue createQueue(String address) {
if (protocol == 0 || protocol == 3) {
return new QueueImpl(address);
}
else {
return ActiveMQJMSClient.createQueue(address);
session.close();
}
}
private javax.jms.Connection createConnection() throws JMSException {
Connection connection;
if (protocol == 3) {
factory = new ConnectionFactoryImpl("localhost", 5672, null, null);
factory = new JmsConnectionFactory("amqp://localhost:5672");
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
@ -894,7 +759,7 @@ public class ProtonTest extends ActiveMQTestBase {
connection.start();
}
else if (protocol == 0) {
factory = new ConnectionFactoryImpl("localhost", 5672, "guest", "guest");
factory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672");
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
@ -909,12 +774,12 @@ public class ProtonTest extends ActiveMQTestBase {
if (protocol == 1) {
transport = new TransportConfiguration(INVM_CONNECTOR_FACTORY);
factory = new ActiveMQConnectionFactory("vm:/0");
}
else {
transport = new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
factory = new ActiveMQConnectionFactory();
}
factory = new ActiveMQConnectionFactory(false, transport);
connection = factory.createConnection("guest", "guest");
connection.setExceptionListener(new ExceptionListener() {
@Override