diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 508bac95cd..88f90ee831 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -423,10 +423,10 @@ public class OpenWireMessageConverter implements MessageConverter, Cl private long maxInactivityDurationInitalDelay = 10 * 1000L; private boolean useKeepAlive = true; - private final OpenWireMessageConverter messageConverter; + private final OpenWireMessageConverter internalConverter; private final Map prefixes = new HashMap<>(); @@ -131,7 +131,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl wireFactory.setCacheEnabled(false); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); scheduledPool = server.getScheduledPool(); - this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat()); + this.internalConverter = new OpenWireMessageConverter(wireFactory.createWireFormat()); final ClusterManager clusterManager = this.server.getClusterManager(); @@ -142,10 +142,6 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } } - public OpenWireFormat getNewWireFormat() { - return (OpenWireFormat) wireFactory.createWireFormat(); - } - @Override public void nodeUP(TopologyMember member, boolean last) { if (topologyMap.put(member.getNodeId(), member) == null) { @@ -583,4 +579,8 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } return total; } + + public OpenWireMessageConverter getInternalConverter() { + return internalConverter; + } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 969d9aea68..57506a265e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -51,7 +51,6 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.RemoveInfo; -import org.apache.activemq.wireformat.WireFormat; public class AMQConsumer { private AMQSession session; @@ -186,10 +185,6 @@ public class AMQConsumer { return info.getConsumerId(); } - public WireFormat getMarshaller() { - return this.session.getMarshaller(); - } - public void acquireCredit(int n) throws Exception { if (messagePullHandler != null) { //don't acquire any credits when the pull handler controls it!! @@ -217,7 +212,7 @@ public class AMQConsumer { //so we need to remove this property too. message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); } - dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this); + dispatch = session.getConverter().createMessageDispatch(reference, message, this); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 9b6670e423..330ac3509d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -54,7 +54,6 @@ import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.wireformat.WireFormat; import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD; @@ -104,7 +103,7 @@ public class AMQSession implements SessionCallback { } public OpenWireMessageConverter getConverter() { - return converter; + return protocolManager.getInternalConverter(); } public void initialize() { @@ -436,11 +435,11 @@ public class AMQSession implements SessionCallback { public ActiveMQServer getCoreServer() { return this.server; } - +/* public WireFormat getMarshaller() { return this.connection.getMarshaller(); } - +*/ public ConnectionInfo getConnectionInfo() { return this.connInfo; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java index e24b63237f..7e82764c0a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java @@ -45,7 +45,9 @@ public class BasicOpenWireTest extends OpenWireTestBase { public TestName name = new TestName(); protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; + protected static final String urlStringLoose = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.tightEncodingEnabled=false"; protected ActiveMQConnectionFactory factory; + protected ActiveMQConnectionFactory looseFactory; protected ActiveMQXAConnectionFactory xaFactory; protected ActiveMQConnection connection; @@ -85,6 +87,7 @@ public class BasicOpenWireTest extends OpenWireTestBase { protected void createFactories() { factory = new ActiveMQConnectionFactory(getConnectionUrl()); + looseFactory = new ActiveMQConnectionFactory(urlStringLoose); xaFactory = new ActiveMQXAConnectionFactory(getConnectionUrl()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 5521814520..9e3af509be 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -367,6 +367,73 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { session.close(); } + @Test + public void testSendReceiveDifferentEncoding() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + System.out.println("creating queue: " + queueName); + Destination dest = new ActiveMQQueue(queueName); + + System.out.println("creating producer..."); + MessageProducer producer = session.createProducer(dest); + + final int num = 10; + final String msgBase = "MfromAMQ-"; + for (int i = 0; i < num; i++) { + TextMessage msg = session.createTextMessage(msgBase + i); + producer.send(msg); + System.out.println("sent: "); + } + + //receive loose + ActiveMQConnection looseConn = (ActiveMQConnection) looseFactory.createConnection(); + try { + looseConn.start(); + Session looseSession = looseConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer looseConsumer = looseSession.createConsumer(dest); + + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) { + TextMessage msg = (TextMessage) looseConsumer.receive(5000); + System.out.println("received: " + msg); + String content = msg.getText(); + System.out.println("content: " + content); + assertEquals(msgBase + i, content); + } + + assertNull(looseConsumer.receive(1000)); + looseConsumer.close(); + + //now reverse + + MessageProducer looseProducer = looseSession.createProducer(dest); + for (int i = 0; i < num; i++) { + TextMessage msg = looseSession.createTextMessage(msgBase + i); + looseProducer.send(msg); + System.out.println("sent: "); + } + + MessageConsumer consumer = session.createConsumer(dest); + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) { + TextMessage msg = (TextMessage) consumer.receive(5000); + System.out.println("received: " + msg); + assertNotNull(msg); + String content = msg.getText(); + System.out.println("content: " + content); + assertEquals(msgBase + i, content); + } + + assertNull(consumer.receive(1000)); + + session.close(); + looseSession.close(); + } finally { + looseConn.close(); + } + } + // @Test -- ignored for now public void testKeepAlive() throws Exception { connection.start();