From f84d26ebb26b0c424ae90f994fc1aa673bbad88f Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Tue, 19 Sep 2017 07:47:51 +0800 Subject: [PATCH] ARTEMIS-1424 Openwire not work with different tightEncoding options If message senders and receivers uses different wireformat.tightEncodingEnabled options, broker will get marshalling problem. This is because when openwire messages are converted to core messages, and later these core messages converted to openwire messages, the broker uses a mashaller that comes with the connection used to carry the messages. For example, if a producer sents a message using option "wireformat .tightEncodingEnabled=false" and a receiver tries to receive it using 'true' for the same option, it'll never get it because the broker will fail to use a "tight encoding" marshaller to decode a 'loose encoded' message. To fix the problem, we always use 'tight encoding' for internal message converters. --- .../openwire/OpenWireMessageConverter.java | 7 +- .../openwire/OpenWireProtocolManager.java | 12 ++-- .../protocol/openwire/amq/AMQConsumer.java | 7 +- .../protocol/openwire/amq/AMQSession.java | 7 +- .../openwire/BasicOpenWireTest.java | 3 + .../openwire/SimpleOpenWireTest.java | 67 +++++++++++++++++++ 6 files changed, 83 insertions(+), 20 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 508bac95cd..88f90ee831 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -423,10 +423,10 @@ public class OpenWireMessageConverter implements MessageConverter, Cl private long maxInactivityDurationInitalDelay = 10 * 1000L; private boolean useKeepAlive = true; - private final OpenWireMessageConverter messageConverter; + private final OpenWireMessageConverter internalConverter; private final Map prefixes = new HashMap<>(); @@ -131,7 +131,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl wireFactory.setCacheEnabled(false); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); scheduledPool = server.getScheduledPool(); - this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat()); + this.internalConverter = new OpenWireMessageConverter(wireFactory.createWireFormat()); final ClusterManager clusterManager = this.server.getClusterManager(); @@ -142,10 +142,6 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } } - public OpenWireFormat getNewWireFormat() { - return (OpenWireFormat) wireFactory.createWireFormat(); - } - @Override public void nodeUP(TopologyMember member, boolean last) { if (topologyMap.put(member.getNodeId(), member) == null) { @@ -583,4 +579,8 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } return total; } + + public OpenWireMessageConverter getInternalConverter() { + return internalConverter; + } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 969d9aea68..57506a265e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -51,7 +51,6 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.RemoveInfo; -import org.apache.activemq.wireformat.WireFormat; public class AMQConsumer { private AMQSession session; @@ -186,10 +185,6 @@ public class AMQConsumer { return info.getConsumerId(); } - public WireFormat getMarshaller() { - return this.session.getMarshaller(); - } - public void acquireCredit(int n) throws Exception { if (messagePullHandler != null) { //don't acquire any credits when the pull handler controls it!! @@ -217,7 +212,7 @@ public class AMQConsumer { //so we need to remove this property too. message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); } - dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this); + dispatch = session.getConverter().createMessageDispatch(reference, message, this); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 9b6670e423..330ac3509d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -54,7 +54,6 @@ import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.wireformat.WireFormat; import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD; @@ -104,7 +103,7 @@ public class AMQSession implements SessionCallback { } public OpenWireMessageConverter getConverter() { - return converter; + return protocolManager.getInternalConverter(); } public void initialize() { @@ -436,11 +435,11 @@ public class AMQSession implements SessionCallback { public ActiveMQServer getCoreServer() { return this.server; } - +/* public WireFormat getMarshaller() { return this.connection.getMarshaller(); } - +*/ public ConnectionInfo getConnectionInfo() { return this.connInfo; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java index e24b63237f..7e82764c0a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java @@ -45,7 +45,9 @@ public class BasicOpenWireTest extends OpenWireTestBase { public TestName name = new TestName(); protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; + protected static final String urlStringLoose = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.tightEncodingEnabled=false"; protected ActiveMQConnectionFactory factory; + protected ActiveMQConnectionFactory looseFactory; protected ActiveMQXAConnectionFactory xaFactory; protected ActiveMQConnection connection; @@ -85,6 +87,7 @@ public class BasicOpenWireTest extends OpenWireTestBase { protected void createFactories() { factory = new ActiveMQConnectionFactory(getConnectionUrl()); + looseFactory = new ActiveMQConnectionFactory(urlStringLoose); xaFactory = new ActiveMQXAConnectionFactory(getConnectionUrl()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 5521814520..9e3af509be 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -367,6 +367,73 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { session.close(); } + @Test + public void testSendReceiveDifferentEncoding() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + System.out.println("creating queue: " + queueName); + Destination dest = new ActiveMQQueue(queueName); + + System.out.println("creating producer..."); + MessageProducer producer = session.createProducer(dest); + + final int num = 10; + final String msgBase = "MfromAMQ-"; + for (int i = 0; i < num; i++) { + TextMessage msg = session.createTextMessage(msgBase + i); + producer.send(msg); + System.out.println("sent: "); + } + + //receive loose + ActiveMQConnection looseConn = (ActiveMQConnection) looseFactory.createConnection(); + try { + looseConn.start(); + Session looseSession = looseConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer looseConsumer = looseSession.createConsumer(dest); + + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) { + TextMessage msg = (TextMessage) looseConsumer.receive(5000); + System.out.println("received: " + msg); + String content = msg.getText(); + System.out.println("content: " + content); + assertEquals(msgBase + i, content); + } + + assertNull(looseConsumer.receive(1000)); + looseConsumer.close(); + + //now reverse + + MessageProducer looseProducer = looseSession.createProducer(dest); + for (int i = 0; i < num; i++) { + TextMessage msg = looseSession.createTextMessage(msgBase + i); + looseProducer.send(msg); + System.out.println("sent: "); + } + + MessageConsumer consumer = session.createConsumer(dest); + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) { + TextMessage msg = (TextMessage) consumer.receive(5000); + System.out.println("received: " + msg); + assertNotNull(msg); + String content = msg.getText(); + System.out.println("content: " + content); + assertEquals(msgBase + i, content); + } + + assertNull(consumer.receive(1000)); + + session.close(); + looseSession.close(); + } finally { + looseConn.close(); + } + } + // @Test -- ignored for now public void testKeepAlive() throws Exception { connection.start();