This closes #433
This commit is contained in:
commit
35d3c50db7
|
@ -656,8 +656,8 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
public void setStringProperty(final String name, final String value) throws JMSException {
|
public void setStringProperty(final String name, final String value) throws JMSException {
|
||||||
checkProperty(name);
|
checkProperty(name);
|
||||||
|
|
||||||
if (MessageUtil.JMSXGROUPID.equals(name)) {
|
if (handleGroupID(name, value)) {
|
||||||
message.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString.toSimpleString(value));
|
return;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value));
|
message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value));
|
||||||
|
@ -666,6 +666,10 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setObjectProperty(final String name, final Object value) throws JMSException {
|
public void setObjectProperty(final String name, final Object value) throws JMSException {
|
||||||
|
if (handleGroupID(name, value)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (ActiveMQJMSConstants.JMS_ACTIVEMQ_OUTPUT_STREAM.equals(name)) {
|
if (ActiveMQJMSConstants.JMS_ACTIVEMQ_OUTPUT_STREAM.equals(name)) {
|
||||||
setOutputStream((OutputStream) value);
|
setOutputStream((OutputStream) value);
|
||||||
|
|
||||||
|
@ -950,5 +954,17 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean handleGroupID(final String name, final Object value) {
|
||||||
|
boolean result = false;
|
||||||
|
|
||||||
|
if (MessageUtil.JMSXGROUPID.equals(name)) {
|
||||||
|
message.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString.toSimpleString(value.toString()));
|
||||||
|
|
||||||
|
result = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
// Inner classes -------------------------------------------------
|
// Inner classes -------------------------------------------------
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,13 +29,17 @@ import org.junit.Test;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.JMSConsumer;
|
||||||
|
import javax.jms.JMSContext;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.JMSProducer;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* GroupingTest
|
* GroupingTest
|
||||||
|
@ -113,6 +117,55 @@ public class GroupingTest extends JMSTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGroupingWithJMS2Producer() throws Exception {
|
||||||
|
final String groupID = UUID.randomUUID().toString();
|
||||||
|
JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED));
|
||||||
|
|
||||||
|
JMSProducer producer = ctx.createProducer().setProperty("JMSXGroupID", groupID);
|
||||||
|
|
||||||
|
JMSConsumer consumer1 = ctx.createConsumer(queue);
|
||||||
|
JMSConsumer consumer2 = ctx.createConsumer(queue);
|
||||||
|
JMSConsumer consumer3 = ctx.createConsumer(queue);
|
||||||
|
|
||||||
|
ctx.start();
|
||||||
|
|
||||||
|
for (int j = 0; j < 100; j++) {
|
||||||
|
TextMessage message = ctx.createTextMessage("Message" + j);
|
||||||
|
|
||||||
|
producer.send(queue, message);
|
||||||
|
|
||||||
|
String prop = message.getStringProperty("JMSXGroupID");
|
||||||
|
|
||||||
|
assertNotNull(prop);
|
||||||
|
assertEquals(groupID, prop);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.commit();
|
||||||
|
|
||||||
|
//All msgs should go to the first consumer
|
||||||
|
for (int j = 0; j < 100; j++) {
|
||||||
|
TextMessage tm = (TextMessage) consumer1.receive(10000);
|
||||||
|
|
||||||
|
assertNotNull(tm);
|
||||||
|
|
||||||
|
tm.acknowledge();
|
||||||
|
|
||||||
|
assertEquals("Message" + j, tm.getText());
|
||||||
|
|
||||||
|
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID);
|
||||||
|
|
||||||
|
tm = (TextMessage) consumer2.receiveNoWait();
|
||||||
|
assertNull(tm);
|
||||||
|
tm = (TextMessage) consumer3.receiveNoWait();
|
||||||
|
assertNull(tm);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.commit();
|
||||||
|
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testManyGroups() throws Exception {
|
public void testManyGroups() throws Exception {
|
||||||
ConnectionFactory fact = getCF();
|
ConnectionFactory fact = getCF();
|
||||||
|
|
Loading…
Reference in New Issue