This commit is contained in:
Clebert Suconic 2017-02-20 08:18:17 -05:00
commit cd47873f25
4 changed files with 170 additions and 0 deletions

View File

@ -156,6 +156,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private ConnectionState state; private ConnectionState state;
private volatile boolean noLocal;
/** /**
* Openwire doesn't sen transactions associated with any sessions. * Openwire doesn't sen transactions associated with any sessions.
* It will however send beingTX / endTX as it would be doing it with XA Transactions. * It will however send beingTX / endTX as it would be doing it with XA Transactions.
@ -836,6 +838,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
disableTtl.set(false); disableTtl.set(false);
} }
public boolean isNoLocal() {
return noLocal;
}
public void setNoLocal(boolean noLocal) {
this.noLocal = noLocal;
}
class SlowConsumerDetection implements SlowConsumerDetectionListener { class SlowConsumerDetection implements SlowConsumerDetectionListener {
@Override @Override

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
@ -79,6 +81,18 @@ public class AMQConsumer {
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
if (info.isNoLocal()) {
if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
//tell the connection to add the property
this.session.getConnection().setNoLocal(true);
}
String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.getId().getConnectionId() + "'";
if (selector == null) {
selector = new SimpleString(noLocalSelector);
} else {
selector = new SimpleString(info.getSelector() + " AND " + noLocalSelector);
}
}
String physicalName = session.convertWildcard(openwireDestination.getPhysicalName()); String physicalName = session.convertWildcard(openwireDestination.getPhysicalName());
@ -201,6 +215,9 @@ public class AMQConsumer {
return 0; return 0;
} }
if (session.getConnection().isNoLocal()) {
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
}
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this); dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);
int size = dispatch.getMessage().getSize(); int size = dispatch.getMessage().getSize();
reference.setProtocolData(dispatch.getMessage().getMessageId()); reference.setProtocolData(dispatch.getMessage().getMessageId());

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.IDGenerator;
@ -297,6 +298,11 @@ public class AMQSession implements SessionCallback {
ServerMessage originalCoreMsg = getConverter().inbound(messageSend); ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
if (connection.isNoLocal() || sessInfo.getSessionId().getValue() == -1) {
//Internal session is used to send advisory messages, which are always noLocal
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getConnectionId().getValue());
}
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
* the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the * the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the

View File

@ -31,6 +31,8 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue; import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic; import javax.jms.TemporaryTopic;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.XAConnection; import javax.jms.XAConnection;
import javax.jms.XASession; import javax.jms.XASession;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
@ -355,6 +357,141 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
session.close(); session.close();
} }
@Test
public void testTopicNoLocal() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("creating queue: " + topicName);
Destination dest = new ActiveMQTopic(topicName);
MessageConsumer nolocalConsumer = session.createConsumer(dest, null, true);
MessageConsumer consumer = session.createConsumer(dest, null, false);
MessageConsumer selectorConsumer = session.createConsumer(dest,"TESTKEY = 'test'", false);
MessageProducer producer = session.createProducer(dest);
final String body1 = "MfromAMQ-1";
final String body2 = "MfromAMQ-2";
TextMessage msg = session.createTextMessage(body1);
producer.send(msg);
msg = session.createTextMessage(body2);
msg.setStringProperty("TESTKEY", "test");
producer.send(msg);
//receive nolocal
TextMessage receivedMsg = (TextMessage) nolocalConsumer.receive(1000);
assertNull("nolocal consumer got: " + receivedMsg, receivedMsg);
//receive normal consumer
receivedMsg = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMsg);
assertEquals(body1, receivedMsg.getText());
receivedMsg = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMsg);
assertEquals(body2, receivedMsg.getText());
assertNull(consumer.receiveNoWait());
//selector should only receive one
receivedMsg = (TextMessage) selectorConsumer.receive(1000);
assertNotNull(receivedMsg);
assertEquals(body2, receivedMsg.getText());
assertEquals("test", receivedMsg.getStringProperty("TESTKEY"));
assertNull(selectorConsumer.receiveNoWait());
//send from another connection
Connection anotherConn = this.factory.createConnection();
try {
anotherConn.start();
Session anotherSession = anotherConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer anotherProducer = anotherSession.createProducer(dest);
TextMessage anotherMsg = anotherSession.createTextMessage(body1);
anotherProducer.send(anotherMsg);
assertNotNull(consumer.receive(1000));
assertNull(selectorConsumer.receive(1000));
assertNotNull(nolocalConsumer.receive(1000));
} finally {
anotherConn.close();
}
session.close();
}
@Test
public void testTopicNoLocalDurable() throws Exception {
connection.setClientID("forNoLocal-1");
connection.start();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("creating queue: " + topicName);
Topic dest = new ActiveMQTopic(topicName);
MessageConsumer nolocalConsumer = session.createDurableSubscriber(dest, "nolocal-subscriber1", "", true);
MessageConsumer consumer = session.createDurableSubscriber(dest, "normal-subscriber", null, false);
MessageConsumer selectorConsumer = session.createDurableSubscriber(dest, "selector-subscriber", "TESTKEY = 'test'", false);
MessageProducer producer = session.createProducer(dest);
final String body1 = "MfromAMQ-1";
final String body2 = "MfromAMQ-2";
TextMessage msg = session.createTextMessage(body1);
producer.send(msg);
msg = session.createTextMessage(body2);
msg.setStringProperty("TESTKEY", "test");
producer.send(msg);
//receive nolocal
TextMessage receivedMsg = (TextMessage) nolocalConsumer.receive(1000);
assertNull("nolocal consumer got: " + receivedMsg, receivedMsg);
//receive normal consumer
receivedMsg = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMsg);
assertEquals(body1, receivedMsg.getText());
receivedMsg = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMsg);
assertEquals(body2, receivedMsg.getText());
assertNull(consumer.receiveNoWait());
//selector should only receive one
receivedMsg = (TextMessage) selectorConsumer.receive(1000);
assertNotNull(receivedMsg);
assertEquals(body2, receivedMsg.getText());
assertEquals("test", receivedMsg.getStringProperty("TESTKEY"));
assertNull(selectorConsumer.receiveNoWait());
//send from another connection
Connection anotherConn = this.factory.createConnection();
try {
anotherConn.start();
Session anotherSession = anotherConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer anotherProducer = anotherSession.createProducer(dest);
TextMessage anotherMsg = anotherSession.createTextMessage(body1);
anotherProducer.send(anotherMsg);
assertNotNull(consumer.receive(1000));
assertNull(selectorConsumer.receive(1000));
assertNotNull(nolocalConsumer.receive(1000));
} finally {
anotherConn.close();
}
session.close();
}
@Test @Test
public void testSimpleTempTopic() throws Exception { public void testSimpleTempTopic() throws Exception {
connection.start(); connection.start();