This closes #1908
This commit is contained in:
commit
66dbc9e3b4
|
@ -16,14 +16,17 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
import javax.jms.InvalidDestinationException;
|
||||
import javax.jms.ResourceAllocationException;
|
||||
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.jms.InvalidDestinationException;
|
||||
import javax.jms.ResourceAllocationException;
|
||||
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
|
@ -31,7 +34,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
|
||||
|
@ -62,8 +64,6 @@ import org.apache.activemq.command.SessionInfo;
|
|||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
|
||||
|
||||
public class AMQSession implements SessionCallback {
|
||||
private final Logger logger = Logger.getLogger(AMQSession.class);
|
||||
|
||||
|
@ -422,10 +422,7 @@ public class AMQSession implements SessionCallback {
|
|||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
}
|
||||
|
||||
final RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
|
||||
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
|
||||
}
|
||||
getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
|
||||
if (count == null || count.decrementAndGet() == 0) {
|
||||
if (sendProducerAck) {
|
||||
|
@ -449,13 +446,8 @@ public class AMQSession implements SessionCallback {
|
|||
Exception exceptionToSend = null;
|
||||
|
||||
try {
|
||||
RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
|
||||
if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
|
||||
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
|
||||
}
|
||||
getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
} catch (Exception e) {
|
||||
|
||||
logger.warn(e.getMessage(), e);
|
||||
exceptionToSend = e;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,13 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -28,12 +35,6 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
|
||||
public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase {
|
||||
|
||||
@Override
|
||||
|
@ -112,4 +113,61 @@ public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleExclusiveDivertOpenWirePublisher() throws Exception {
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession coreSession = sf.createSession(false, true, true);
|
||||
|
||||
final SimpleString queueName1 = new SimpleString("queue1");
|
||||
final SimpleString queueName2 = new SimpleString("queue2");
|
||||
|
||||
coreSession.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false);
|
||||
coreSession.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName2, null, false);
|
||||
coreSession.close();
|
||||
|
||||
factory = new ActiveMQConnectionFactory(urlString);
|
||||
Connection openwireConnection = factory.createConnection();
|
||||
|
||||
try {
|
||||
openwireConnection.start();
|
||||
Session session = openwireConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(session.createQueue(testAddress));
|
||||
|
||||
final int numMessages = 10;
|
||||
|
||||
final String propKey = "testkey";
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
Message message = session.createMessage();
|
||||
message.setIntProperty(propKey, i);
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
Queue q1 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue1"));
|
||||
Queue q2 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue2"));
|
||||
|
||||
MessageConsumer consumer1 = session.createConsumer(q1);
|
||||
MessageConsumer consumer2 = session.createConsumer(q2);
|
||||
|
||||
System.out.println("receiving ...");
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
Message message = consumer1.receive(TIMEOUT);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
Assert.assertEquals(i, message.getObjectProperty(propKey.toString()));
|
||||
|
||||
message.acknowledge();
|
||||
}
|
||||
Assert.assertNull(consumer1.receive(50));
|
||||
Assert.assertNull(consumer2.receive(50));
|
||||
} finally {
|
||||
if (openwireConnection != null) {
|
||||
openwireConnection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.junit.Test;
|
|||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
|
||||
|
@ -119,4 +120,68 @@ public class OpenWireDivertNonExclusiveTest extends OpenWireDivertTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
//openwire sending, openwire receiving
|
||||
public void testSingleNonExclusiveDivertOpenWirePublisher() throws Exception {
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
ClientSession coreSession = sf.createSession(false, true, true);
|
||||
|
||||
final SimpleString queueName1 = new SimpleString("queue1");
|
||||
final SimpleString queueName2 = new SimpleString("queue2");
|
||||
|
||||
coreSession.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false);
|
||||
coreSession.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName2, null, false);
|
||||
coreSession.close();
|
||||
|
||||
//use openwire to receive
|
||||
factory = new ActiveMQConnectionFactory(urlString);
|
||||
Connection openwireConnection = factory.createConnection();
|
||||
|
||||
try {
|
||||
Session session = openwireConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
openwireConnection.start();
|
||||
|
||||
MessageProducer producer = session.createProducer(session.createQueue(testAddress));
|
||||
|
||||
final int numMessages = 10;
|
||||
final String propKey = "testkey";
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
Message message = session.createMessage();
|
||||
message.setIntProperty(propKey, i);
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
Queue q1 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue1"));
|
||||
Queue q2 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue2"));
|
||||
|
||||
MessageConsumer consumer1 = session.createConsumer(q1);
|
||||
MessageConsumer consumer2 = session.createConsumer(q2);
|
||||
|
||||
System.out.println("receiving ...");
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
Message message = consumer1.receive(TIMEOUT);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals(i, message.getObjectProperty(propKey.toString()));
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer1.receive(50));
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
Message message = consumer2.receive(TIMEOUT);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals(i, message.getObjectProperty(propKey.toString()));
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer2.receive(50));
|
||||
} finally {
|
||||
if (openwireConnection != null) {
|
||||
openwireConnection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue