ARTEMIS-1711 - Fix openwire exlusive divert

Fixing the failure on send from an OpenWire producer when an exclusive
divert exists
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-02-28 09:30:36 -05:00 committed by Clebert Suconic
parent d61f058ff2
commit 8e9ee80892
3 changed files with 136 additions and 21 deletions

View File

@ -16,14 +16,17 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import javax.jms.InvalidDestinationException;
import javax.jms.ResourceAllocationException;
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidDestinationException;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -31,7 +34,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
@ -62,8 +64,6 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
public class AMQSession implements SessionCallback {
private final Logger logger = Logger.getLogger(AMQSession.class);
@ -422,10 +422,7 @@ public class AMQSession implements SessionCallback {
throw new ResourceAllocationException("Queue is full " + address);
}
final RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
}
getCoreSession().send(coreMsg, false, dest.isTemporary());
if (count == null || count.decrementAndGet() == 0) {
if (sendProducerAck) {
@ -449,13 +446,8 @@ public class AMQSession implements SessionCallback {
Exception exceptionToSend = null;
try {
RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
}
getCoreSession().send(coreMsg, false, dest.isTemporary());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
exceptionToSend = e;
}

View File

@ -16,6 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -28,12 +35,6 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.Assert;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase {
@Override
@ -112,4 +113,61 @@ public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase {
}
}
}
@Test
public void testSingleExclusiveDivertOpenWirePublisher() throws Exception {
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession coreSession = sf.createSession(false, true, true);
final SimpleString queueName1 = new SimpleString("queue1");
final SimpleString queueName2 = new SimpleString("queue2");
coreSession.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false);
coreSession.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName2, null, false);
coreSession.close();
factory = new ActiveMQConnectionFactory(urlString);
Connection openwireConnection = factory.createConnection();
try {
openwireConnection.start();
Session session = openwireConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(testAddress));
final int numMessages = 10;
final String propKey = "testkey";
for (int i = 0; i < numMessages; i++) {
Message message = session.createMessage();
message.setIntProperty(propKey, i);
producer.send(message);
}
Queue q1 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue2"));
MessageConsumer consumer1 = session.createConsumer(q1);
MessageConsumer consumer2 = session.createConsumer(q2);
System.out.println("receiving ...");
for (int i = 0; i < numMessages; i++) {
Message message = consumer1.receive(TIMEOUT);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getObjectProperty(propKey.toString()));
message.acknowledge();
}
Assert.assertNull(consumer1.receive(50));
Assert.assertNull(consumer2.receive(50));
} finally {
if (openwireConnection != null) {
openwireConnection.close();
}
}
}
}

View File

@ -31,6 +31,7 @@ import org.junit.Test;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@ -119,4 +120,68 @@ public class OpenWireDivertNonExclusiveTest extends OpenWireDivertTestBase {
}
}
@Test
//openwire sending, openwire receiving
public void testSingleNonExclusiveDivertOpenWirePublisher() throws Exception {
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession coreSession = sf.createSession(false, true, true);
final SimpleString queueName1 = new SimpleString("queue1");
final SimpleString queueName2 = new SimpleString("queue2");
coreSession.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false);
coreSession.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName2, null, false);
coreSession.close();
//use openwire to receive
factory = new ActiveMQConnectionFactory(urlString);
Connection openwireConnection = factory.createConnection();
try {
Session session = openwireConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
openwireConnection.start();
MessageProducer producer = session.createProducer(session.createQueue(testAddress));
final int numMessages = 10;
final String propKey = "testkey";
for (int i = 0; i < numMessages; i++) {
Message message = session.createMessage();
message.setIntProperty(propKey, i);
producer.send(message);
}
Queue q1 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue2"));
MessageConsumer consumer1 = session.createConsumer(q1);
MessageConsumer consumer2 = session.createConsumer(q2);
System.out.println("receiving ...");
for (int i = 0; i < numMessages; i++) {
Message message = consumer1.receive(TIMEOUT);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getObjectProperty(propKey.toString()));
message.acknowledge();
}
Assert.assertNull(consumer1.receive(50));
for (int i = 0; i < numMessages; i++) {
Message message = consumer2.receive(TIMEOUT);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getObjectProperty(propKey.toString()));
message.acknowledge();
}
Assert.assertNull(consumer2.receive(50));
} finally {
if (openwireConnection != null) {
openwireConnection.close();
}
}
}
}