ARTEMIS-1348 Support LVQ for AMQP
Add support for LVQ, using the same property key as core "_AMQ_LVQ_NAME" Add test case for AMQP LVQ.
This commit is contained in:
parent
69e345f291
commit
26752a7aaf
|
@ -62,6 +62,7 @@ import io.netty.buffer.Unpooled;
|
|||
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
|
||||
public class AMQPMessage extends RefCountMessage {
|
||||
|
||||
public static final String HDR_LAST_VALUE_NAME = org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString();
|
||||
public static final int DEFAULT_MESSAGE_PRIORITY = 4;
|
||||
public static final int MAX_MESSAGE_PRIORITY = 9;
|
||||
|
||||
|
@ -1000,6 +1001,11 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getLastValueProperty() {
|
||||
return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getReplyTo() {
|
||||
if (getProperties() != null) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import javax.jms.Connection;
|
|||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.jboss.logging.Logger;
|
||||
|
@ -206,4 +207,56 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
|
|||
|
||||
return connection;
|
||||
}
|
||||
|
||||
protected String getBrokerOpenWireJMSConnectionString() {
|
||||
|
||||
try {
|
||||
int port = AMQP_PORT;
|
||||
|
||||
String uri = null;
|
||||
|
||||
if (isUseSSL()) {
|
||||
uri = "tcp://127.0.0.1:" + port;
|
||||
} else {
|
||||
uri = "tcp://127.0.0.1:" + port;
|
||||
}
|
||||
|
||||
if (!getJmsConnectionURIOptions().isEmpty()) {
|
||||
uri = uri + "?" + getJmsConnectionURIOptions();
|
||||
} else {
|
||||
uri = uri + "?wireFormat.cacheEnabled=true";
|
||||
}
|
||||
|
||||
return uri;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
}
|
||||
|
||||
protected Connection createOpenWireConnection() throws JMSException {
|
||||
return createCoreConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true);
|
||||
}
|
||||
|
||||
private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString);
|
||||
|
||||
Connection connection = trackJMSConnection(factory.createConnection(username, password));
|
||||
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
exception.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
if (clientId != null && !clientId.isEmpty()) {
|
||||
connection.setClientID(clientId);
|
||||
}
|
||||
|
||||
if (start) {
|
||||
connection.start();
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
import org.junit.Test;
|
||||
|
||||
public class JMSLVQTest extends JMSClientTestSupport {
|
||||
|
||||
private static final String LVQ_QUEUE_NAME = "LVQ";
|
||||
|
||||
@Override
|
||||
protected String getConfiguredProtocols() {
|
||||
return "AMQP,OPENWIRE,CORE";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addConfiguration(ActiveMQServer server) {
|
||||
server.getAddressSettingsRepository().addMatch(LVQ_QUEUE_NAME, new AddressSettings().setLastValueQueue(true));
|
||||
}
|
||||
@Override
|
||||
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
|
||||
super.createAddressAndQueues(server);
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(LVQ_QUEUE_NAME), RoutingType.ANYCAST));
|
||||
server.createQueue(SimpleString.toSimpleString(LVQ_QUEUE_NAME), RoutingType.ANYCAST, SimpleString.toSimpleString("LVQ"), null, true, false, -1, false, true);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testLVQAMQPProducerAMQPConsumer() throws Exception {
|
||||
Connection producerConnection = createConnection();
|
||||
Connection consumerConnection = createConnection();
|
||||
testLVQ(producerConnection, consumerConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLVQCoreProducerCoreConsumer() throws Exception {
|
||||
Connection producerConnection = createCoreConnection();
|
||||
Connection consumerConnection = createCoreConnection();
|
||||
testLVQ(producerConnection, consumerConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLVQCoreProducerAMQPConsumer() throws Exception {
|
||||
Connection producerConnection = createCoreConnection();
|
||||
Connection consumerConnection = createConnection();
|
||||
testLVQ(producerConnection, consumerConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLVQAMQPProducerCoreConsumer() throws Exception {
|
||||
Connection producerConnection = createConnection();
|
||||
Connection consumerConnection = createCoreConnection();
|
||||
testLVQ(producerConnection, consumerConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLVQOpenWireProducerOpenWireConsumer() throws Exception {
|
||||
Connection producerConnection = createOpenWireConnection();
|
||||
Connection consumerConnection = createOpenWireConnection();
|
||||
testLVQ(producerConnection, consumerConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLVQCoreProducerOpenWireConsumer() throws Exception {
|
||||
Connection producerConnection = createCoreConnection();
|
||||
Connection consumerConnection = createOpenWireConnection();
|
||||
testLVQ(producerConnection, consumerConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLVQOpenWireProducerCoreConsumer() throws Exception {
|
||||
Connection producerConnection = createOpenWireConnection();
|
||||
Connection consumerConnection = createCoreConnection();
|
||||
testLVQ(producerConnection, consumerConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLVQAMQPProducerOpenWireConsumer() throws Exception {
|
||||
Connection producerConnection = createConnection();
|
||||
Connection consumerConnection = createOpenWireConnection();
|
||||
testLVQ(producerConnection, consumerConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLVQOpenWireProducerAMQPConsumer() throws Exception {
|
||||
Connection producerConnection = createOpenWireConnection();
|
||||
Connection consumerConnection = createConnection();
|
||||
testLVQ(producerConnection, consumerConnection);
|
||||
}
|
||||
|
||||
public void testLVQ(Connection producerConnection, Connection consumerConnection) throws Exception {
|
||||
|
||||
try {
|
||||
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue1 = producerSession.createQueue(LVQ_QUEUE_NAME);
|
||||
MessageProducer p = producerSession.createProducer(null);
|
||||
|
||||
TextMessage message1 = producerSession.createTextMessage();
|
||||
message1.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY");
|
||||
message1.setText("hello");
|
||||
p.send(queue1, message1);
|
||||
|
||||
TextMessage message2 = producerSession.createTextMessage();
|
||||
message2.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY");
|
||||
message2.setText("how are you");
|
||||
p.send(queue1, message2);
|
||||
|
||||
|
||||
Session consumerSession = consumerConnection.createSession();
|
||||
Queue consumerQueue = consumerSession.createQueue(LVQ_QUEUE_NAME);
|
||||
MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
|
||||
Message msg = consumer.receive(1000);
|
||||
assertNotNull(msg);
|
||||
assertEquals("KEY", msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME));
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
assertEquals("how are you", ((TextMessage)msg).getText());
|
||||
consumer.close();
|
||||
|
||||
} finally {
|
||||
producerConnection.close();
|
||||
consumerConnection.close();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue