This commit is contained in:
Clebert Suconic 2017-02-23 12:55:28 -05:00
commit f6ed811c15
14 changed files with 214 additions and 63 deletions

View File

@ -42,7 +42,7 @@ public class Browse extends DestAbstract {
ActiveMQConnectionFactory factory = createConnectionFactory();
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE);
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
try (Connection connection = factory.createConnection()) {
ConsumerThread[] threadsArray = new ConsumerThread[threads];
for (int i = 0; i < threads; i++) {

View File

@ -51,7 +51,7 @@ public class Consumer extends DestAbstract {
ActiveMQConnectionFactory factory = createConnectionFactory();
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE);
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
try (Connection connection = factory.createConnection()) {
ConsumerThread[] threadsArray = new ConsumerThread[threads];
for (int i = 0; i < threads; i++) {

View File

@ -52,7 +52,7 @@ public class Producer extends DestAbstract {
ActiveMQConnectionFactory factory = createConnectionFactory();
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE);
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
try (Connection connection = factory.createConnection()) {
ProducerThread[] threadsArray = new ProducerThread[threads];
for (int i = 0; i < threads; i++) {

View File

@ -571,7 +571,7 @@ public class ArtemisTest extends CliTestBase {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection("admin", "admin");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(ActiveMQDestination.createDestination("queue://TEST", ActiveMQDestination.QUEUE_TYPE));
MessageProducer producer = session.createProducer(ActiveMQDestination.createDestination("queue://TEST", ActiveMQDestination.TYPE.QUEUE));
TextMessage message = session.createTextMessage("Banana");
message.setStringProperty("fruit", "banana");

View File

@ -38,28 +38,12 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
// Static --------------------------------------------------------
/**
*
*/
private static final long serialVersionUID = 5027962425462382883L;
// public static final String JMS_QUEUE_ADDRESS_PREFIX = "jms.queue.";
// public static final String JMS_TEMP_QUEUE_ADDRESS_PREFIX = "jms.tempqueue.";
// public static final String JMS_TOPIC_ADDRESS_PREFIX = "jms.topic.";
// public static final String JMS_TEMP_TOPIC_ADDRESS_PREFIX = "jms.temptopic.";
public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
public static final byte QUEUE_TYPE = 0x01;
public static final byte TOPIC_TYPE = 0x02;
public static final byte TEMP_MASK = 0x04;
public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
private static final char SEPARATOR = '.';
@ -73,7 +57,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
/**
* Static helper method for working with destinations.
*/
public static ActiveMQDestination createDestination(String name, byte defaultType) {
public static ActiveMQDestination createDestination(String name, TYPE defaultType) {
if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
} else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {
@ -85,14 +69,16 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
switch (defaultType) {
case QUEUE_TYPE:
case QUEUE:
return new ActiveMQQueue(name);
case TOPIC_TYPE:
case TOPIC:
return new ActiveMQTopic(name);
case TEMP_QUEUE_TYPE:
case TEMP_QUEUE:
return new ActiveMQQueue(name, true);
case TEMP_TOPIC_TYPE:
case TEMP_TOPIC:
return new ActiveMQTopic(name, true);
case DESTINATION:
return new ActiveMQDestination(name, name, TYPE.DESTINATION, null);
default:
throw new IllegalArgumentException("Invalid default destination type: " + defaultType);
}
@ -101,22 +87,18 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
public static Destination fromPrefixedName(final String address) {
if (address.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
String name = address.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
return createQueue(name);
} else if (address.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
String name = address.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
return createTopic(name);
} else if (address.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
String name = address.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
return new ActiveMQTemporaryQueue(name, name, null);
} else if (address.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
String name = address.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
return new ActiveMQTemporaryTopic(name, name, null);
} else {
throw new JMSRuntimeException("Invalid address " + address);
return new ActiveMQDestination(address, address, TYPE.DESTINATION, null);
}
}
@ -222,7 +204,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
public static ActiveMQTemporaryQueue createTemporaryQueue(final String name) {
return createTemporaryQueue(/*TEMP_QUEUE_QUALIFED_PREFIX + */name, null);
return createTemporaryQueue(name, null);
}
public static ActiveMQTemporaryQueue createTemporaryQueue(final ActiveMQSession session) {
@ -238,7 +220,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
public static ActiveMQTemporaryTopic createTemporaryTopic(String name, final ActiveMQSession session) {
return new ActiveMQTemporaryTopic(/*TEMP_TOPIC_QUALIFED_PREFIX + */name, name, session);
return new ActiveMQTemporaryTopic(name, name, session);
}
public static ActiveMQTemporaryTopic createTemporaryTopic(String name) {
@ -262,9 +244,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
*/
private final SimpleString simpleAddress;
private final boolean temporary;
private final boolean queue;
private final TYPE type;
private final transient ActiveMQSession session;
@ -272,8 +252,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
protected ActiveMQDestination(final String address,
final String name,
final boolean temporary,
final boolean queue,
final TYPE type,
final ActiveMQSession session) {
this.address = address;
@ -281,9 +260,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
simpleAddress = new SimpleString(address);
this.temporary = temporary;
this.queue = queue;
this.type = type;
this.session = session;
}
@ -301,7 +278,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
// Temporary queues will be deleted when the connection is closed.. nothing to be done then!
return;
}
if (queue) {
if (isQueue()) {
session.deleteTemporaryQueue(this);
} else {
session.deleteTemporaryTopic(this);
@ -310,7 +287,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
public boolean isQueue() {
return queue;
return TYPE.isQueue(type);
}
// Public --------------------------------------------------------
@ -328,7 +305,11 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
public boolean isTemporary() {
return temporary;
return TYPE.isTemporary(type);
}
public TYPE getType() {
return type;
}
@Override
@ -358,4 +339,66 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
public enum TYPE {
QUEUE,
TOPIC,
TEMP_QUEUE,
TEMP_TOPIC,
DESTINATION; // unknown
public byte getType() {
switch (this) {
case QUEUE:
return 0;
case TOPIC:
return 1;
case TEMP_QUEUE:
return 2;
case TEMP_TOPIC:
return 3;
case DESTINATION:
return 4;
default:
return -1;
}
}
public static TYPE getType(byte type) {
switch (type) {
case 0:
return QUEUE;
case 1:
return TOPIC;
case 2:
return TEMP_QUEUE;
case 3:
return TEMP_TOPIC;
case 4:
return DESTINATION;
default:
return null;
}
}
public static boolean isQueue(TYPE type) {
boolean result = false;
if (type.equals(QUEUE) || type.equals(TEMP_QUEUE)) {
result = true;
}
return result;
}
public static boolean isTemporary(TYPE type) {
boolean result = false;
if (type.equals(TEMP_TOPIC) || type.equals(TEMP_QUEUE)) {
result = true;
}
return result;
}
}
}

View File

@ -41,11 +41,11 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
// Constructors --------------------------------------------------
public ActiveMQQueue(final String name) {
super(name, name, false, true, null);
super(name, name, TYPE.QUEUE, null);
}
public ActiveMQQueue(final String name, boolean temporary) {
super(name, name, temporary, true, null);
super(name, name, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null);
}
/**
@ -55,11 +55,11 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
* @param session
*/
public ActiveMQQueue(String address, String name, boolean temporary, ActiveMQSession session) {
super(address, name, temporary, true, session);
super(address, name, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, session);
}
public ActiveMQQueue(final String address, final String name) {
super(address, name, false, true, null);
super(address, name, TYPE.QUEUE, null);
}
// Queue implementation ------------------------------------------

View File

@ -44,7 +44,7 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
}
public ActiveMQTopic(final String name, boolean temporary) {
super(name, name, temporary, false, null);
super(name, name, TYPE.TOPIC, null);
}
/**
@ -54,7 +54,7 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
* @param session
*/
protected ActiveMQTopic(String address, String name, boolean temporary, ActiveMQSession session) {
super(address, name, temporary, false, session);
super(address, name, temporary ? TYPE.TEMP_TOPIC : TYPE.TOPIC, session);
}
// Topic implementation ------------------------------------------

View File

@ -333,7 +333,7 @@ public class EmbeddedJMSResource extends ExternalResource {
*/
public Queue getDestinationQueue(String destinationName) {
Queue queue = null;
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.TYPE.QUEUE);
String address = destination.getAddress();
String name = destination.getName();
if (destination.isQueue()) {
@ -368,7 +368,7 @@ public class EmbeddedJMSResource extends ExternalResource {
*/
public List<Queue> getTopicQueues(String topicName) {
List<Queue> queues = new LinkedList<>();
ActiveMQDestination destination = ActiveMQDestination.createDestination(topicName, ActiveMQDestination.TOPIC_TYPE);
ActiveMQDestination destination = ActiveMQDestination.createDestination(topicName, ActiveMQDestination.TYPE.TOPIC);
if (!destination.isQueue()) {
BindingQueryResult bindingQueryResult = null;
try {
@ -405,7 +405,7 @@ public class EmbeddedJMSResource extends ExternalResource {
*/
public long getMessageCount(String destinationName) {
long count = 0;
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.TYPE.QUEUE);
if (destination.isQueue()) {
Queue queue = getDestinationQueue(destinationName);
if (queue == null) {
@ -529,7 +529,7 @@ public class EmbeddedJMSResource extends ExternalResource {
} else if (message == null) {
throw new IllegalArgumentException("sendMessage failure - a Message is required");
}
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.TYPE.QUEUE);
getInternalClient().pushMessage(destination, message);
}

View File

@ -58,7 +58,7 @@ public class EmbeddedJMSResourceMultipleFileConfigurationTest {
connectionFactory = new ActiveMQConnectionFactory(jmsServer.getVmURL());
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = (ActiveMQMessageConsumer) session.createConsumer(ActiveMQDestination.createDestination(TEST_TOPIC, ActiveMQDestination.TOPIC_TYPE));
consumer = (ActiveMQMessageConsumer) session.createConsumer(ActiveMQDestination.createDestination(TEST_TOPIC, ActiveMQDestination.TYPE.TOPIC));
connection.start();
}

View File

@ -58,7 +58,7 @@ public class EmbeddedJMSResourceSingleFileConfigurationTest {
connectionFactory = new ActiveMQConnectionFactory(jmsServer.getVmURL());
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = (ActiveMQMessageConsumer) session.createConsumer(ActiveMQDestination.createDestination(TEST_TOPIC, ActiveMQDestination.TOPIC_TYPE));
consumer = (ActiveMQMessageConsumer) session.createConsumer(ActiveMQDestination.createDestination(TEST_TOPIC, ActiveMQDestination.TYPE.TOPIC));
connection.start();
}

View File

@ -73,7 +73,7 @@ public class EmbeddedJMSResourceTopicTest {
connectionFactory = new ActiveMQConnectionFactory(jmsServer.getVmURL());
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(ActiveMQDestination.createDestination(TEST_DESTINATION_NAME, ActiveMQDestination.TOPIC_TYPE));
consumer = session.createConsumer(ActiveMQDestination.createDestination(TEST_DESTINATION_NAME, ActiveMQDestination.TYPE.TOPIC));
connection.start();
}

View File

@ -28,7 +28,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
public class ServerDestination extends ActiveMQDestination implements Queue {
public ServerDestination(String name) {
super(name, name, false, false, null);
super(name, name, TYPE.DESTINATION, null);
}
@Override

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.crossprotocol;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AMQPToJMSCore extends ActiveMQTestBase {
private ActiveMQServer server;
protected String queueName = "amqTestQueue1";
private SimpleString coreQueue;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(true, true);
server.start();
server.waitForActivation(10, TimeUnit.SECONDS);
Configuration serverConfig = server.getConfiguration();
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(false)
.setAutoCreateAddresses(false)
.setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
coreQueue = new SimpleString(queueName);
server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false);
}
@Override
@After
public void tearDown() throws Exception {
server.stop();
super.tearDown();
}
@Test
public void testMessageDestination() throws Exception {
System.out.println("foo");
AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:61616"), null, null);
AmqpConnection amqpconnection = client.connect();
try {
AmqpSession session = amqpconnection.createSession();
AmqpSender sender = session.createSender(queueName);
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + 0);
// message.setApplicationProperty("_AMQ_ROUTING_TYPE", (byte) 1);
message.getWrappedMessage().setHeader(new Header());
message.getWrappedMessage().getHeader().setDeliveryCount(new UnsignedInteger(2));
sender.send(message);
} finally {
amqpconnection.close();
}
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Connection connection = null;
try {
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue(queueName));
connection.start();
Message message = consumer.receive(2000);
Assert.assertNotNull(message);
ActiveMQDestination jmsDestination = (ActiveMQDestination) message.getJMSDestination();
Assert.assertEquals(queueName, jmsDestination.getAddress());
} finally {
if (connection != null) {
connection.close();
}
}
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.artemis.tests.unit.jms;
import javax.jms.JMSRuntimeException;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
@ -77,11 +77,8 @@ public class ActiveMQDestinationTest extends ActiveMQTestBase {
String invalidPrefix = "junk";
String destinationName = RandomUtil.randomString();
String address = invalidPrefix + destinationName;
try {
ActiveMQDestination.fromPrefixedName(address);
Assert.fail("IllegalArgumentException");
} catch (JMSRuntimeException e) {
}
ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(address);
Assert.assertTrue(destination instanceof Destination);
}
// Package protected ---------------------------------------------