This commit is contained in:
Justin Bertram 2017-12-14 20:30:53 -06:00
commit facb6443c2
3 changed files with 152 additions and 3 deletions

View File

@ -80,7 +80,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER";
private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID";
private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE";
private static final String AMQ_MSG_GROUP_ID = AMQ_PREFIX + "GROUP_ID";
private static final String AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID.toString();
private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE";
private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID";
private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION";
@ -698,7 +698,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
Object value = coreMessage.getObjectProperty(AMQ_MSG_GROUP_ID);
Object value = coreMessage.getGroupID();
if (value != null) {
String groupId = value.toString();
amqMsg.setGroupID(groupId);

View File

@ -83,7 +83,6 @@ public class TemporaryQueueClusterTest extends JMSClusteredTestBase {
}
}
// TODO: this is broken because temporary queues are no longer created with the "jms.temp-queue" prefix which means the cluster-connection won't match it
@Test
public void testTemporaryQueue() throws Exception {
jmsServer1.createQueue(false, QUEUE_NAME, null, false, "/queue/target");

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@RunWith(Parameterized.class)
public class OpenWireGroupingTest extends BasicOpenWireTest {
//whether to use core send grouping messages
private boolean coreSend;
//whether to use core receive grouping messages
private boolean coreReceive;
@Parameterized.Parameters(name = "core-send={0} core-receive={1}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{{true, true},
{true, false},
{false, true},
{false, false}});
}
public OpenWireGroupingTest(boolean coreSend, boolean coreReceive) {
this.coreSend = coreSend;
this.coreReceive = coreReceive;
}
@Test
public void testGrouping() throws Exception {
String jmsxgroupID = null;
ConnectionFactory sendFact = coreSend ? coreCf : factory;
ConnectionFactory receiveFact = coreReceive ? coreCf : factory;
final int num = 10;
try (Connection coreConn = sendFact.createConnection()) {
Session session = coreConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
for (int j = 0; j < num; j++) {
TextMessage message = session.createTextMessage();
message.setText("Message" + j);
setProperty(message);
producer.send(message);
String prop = message.getStringProperty("JMSXGroupID");
assertNotNull(prop);
if (jmsxgroupID != null) {
assertEquals(jmsxgroupID, prop);
} else {
jmsxgroupID = prop;
}
}
}
try (Connection connection = receiveFact.createConnection()) {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer1 = session.createConsumer(queue);
MessageConsumer consumer2 = session.createConsumer(queue);
MessageConsumer consumer3 = session.createConsumer(queue);
connection.start();
List<MessageConsumer> otherConsumers = new ArrayList<>();
otherConsumers.add(consumer1);
otherConsumers.add(consumer2);
otherConsumers.add(consumer3);
//find out which one broker picks up
MessageConsumer groupConsumer = null;
for (MessageConsumer consumer : otherConsumers) {
TextMessage tm = (TextMessage) consumer.receive(2000);
if (tm != null) {
assertEquals("Message" + 0, tm.getText());
otherConsumers.remove(consumer);
groupConsumer = consumer;
break;
}
}
assertNotNull(groupConsumer);
//All msgs should go to the group consumer
for (int j = 1; j < num; j++) {
TextMessage tm = (TextMessage) groupConsumer.receive(2000);
assertNotNull(tm);
assertEquals("Message" + j, tm.getText());
assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
}
for (MessageConsumer consumer : otherConsumers) {
assertNull(consumer.receive(100));
}
}
}
protected void setProperty(Message message) {
if (coreSend) {
((ActiveMQMessage) message).getCoreMessage().putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, new SimpleString("foo"));
} else {
org.apache.activemq.command.ActiveMQMessage m = (org.apache.activemq.command.ActiveMQMessage) message;
m.setGroupID("foo");
}
}
}