This commit is contained in:
Timothy Bish 2017-09-20 17:05:29 -04:00
commit e359f4bfd1
6 changed files with 83 additions and 20 deletions

View File

@ -423,10 +423,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
}
public static MessageDispatch createMessageDispatch(MessageReference reference,
public MessageDispatch createMessageDispatch(MessageReference reference,
ICoreMessage message,
AMQConsumer consumer) throws IOException, JMSException {
ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getOpenwireDestination());
//we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@ -441,9 +441,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
return md;
}
private static ActiveMQMessage toAMQMessage(MessageReference reference,
private ActiveMQMessage toAMQMessage(MessageReference reference,
ICoreMessage coreMessage,
WireFormat marshaller,
ActiveMQDestination actualDestination) throws IOException {
ActiveMQMessage amqMsg = null;
byte coreType = coreMessage.getType();

View File

@ -119,7 +119,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private long maxInactivityDurationInitalDelay = 10 * 1000L;
private boolean useKeepAlive = true;
private final OpenWireMessageConverter messageConverter;
private final OpenWireMessageConverter internalConverter;
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
@ -131,7 +131,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
wireFactory.setCacheEnabled(false);
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
scheduledPool = server.getScheduledPool();
this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
this.internalConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
final ClusterManager clusterManager = this.server.getClusterManager();
@ -142,10 +142,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
}
public OpenWireFormat getNewWireFormat() {
return (OpenWireFormat) wireFactory.createWireFormat();
}
@Override
public void nodeUP(TopologyMember member, boolean last) {
if (topologyMap.put(member.getNodeId(), member) == null) {
@ -583,4 +579,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
return total;
}
public OpenWireMessageConverter getInternalConverter() {
return internalConverter;
}
}

View File

@ -51,7 +51,6 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.wireformat.WireFormat;
public class AMQConsumer {
private AMQSession session;
@ -186,10 +185,6 @@ public class AMQConsumer {
return info.getConsumerId();
}
public WireFormat getMarshaller() {
return this.session.getMarshaller();
}
public void acquireCredit(int n) throws Exception {
if (messagePullHandler != null) {
//don't acquire any credits when the pull handler controls it!!
@ -217,7 +212,7 @@ public class AMQConsumer {
//so we need to remove this property too.
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
}
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);
dispatch = session.getConverter().createMessageDispatch(reference, message, this);
int size = dispatch.getMessage().getSize();
reference.setProtocolData(dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);

View File

@ -54,7 +54,6 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
@ -104,7 +103,7 @@ public class AMQSession implements SessionCallback {
}
public OpenWireMessageConverter getConverter() {
return converter;
return protocolManager.getInternalConverter();
}
public void initialize() {
@ -436,11 +435,11 @@ public class AMQSession implements SessionCallback {
public ActiveMQServer getCoreServer() {
return this.server;
}
/*
public WireFormat getMarshaller() {
return this.connection.getMarshaller();
}
*/
public ConnectionInfo getConnectionInfo() {
return this.connInfo;
}

View File

@ -45,7 +45,9 @@ public class BasicOpenWireTest extends OpenWireTestBase {
public TestName name = new TestName();
protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
protected static final String urlStringLoose = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.tightEncodingEnabled=false";
protected ActiveMQConnectionFactory factory;
protected ActiveMQConnectionFactory looseFactory;
protected ActiveMQXAConnectionFactory xaFactory;
protected ActiveMQConnection connection;
@ -85,6 +87,7 @@ public class BasicOpenWireTest extends OpenWireTestBase {
protected void createFactories() {
factory = new ActiveMQConnectionFactory(getConnectionUrl());
looseFactory = new ActiveMQConnectionFactory(urlStringLoose);
xaFactory = new ActiveMQXAConnectionFactory(getConnectionUrl());
}

View File

@ -367,6 +367,73 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
session.close();
}
@Test
public void testSendReceiveDifferentEncoding() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("creating queue: " + queueName);
Destination dest = new ActiveMQQueue(queueName);
System.out.println("creating producer...");
MessageProducer producer = session.createProducer(dest);
final int num = 10;
final String msgBase = "MfromAMQ-";
for (int i = 0; i < num; i++) {
TextMessage msg = session.createTextMessage(msgBase + i);
producer.send(msg);
System.out.println("sent: ");
}
//receive loose
ActiveMQConnection looseConn = (ActiveMQConnection) looseFactory.createConnection();
try {
looseConn.start();
Session looseSession = looseConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer looseConsumer = looseSession.createConsumer(dest);
System.out.println("receiving messages...");
for (int i = 0; i < num; i++) {
TextMessage msg = (TextMessage) looseConsumer.receive(5000);
System.out.println("received: " + msg);
String content = msg.getText();
System.out.println("content: " + content);
assertEquals(msgBase + i, content);
}
assertNull(looseConsumer.receive(1000));
looseConsumer.close();
//now reverse
MessageProducer looseProducer = looseSession.createProducer(dest);
for (int i = 0; i < num; i++) {
TextMessage msg = looseSession.createTextMessage(msgBase + i);
looseProducer.send(msg);
System.out.println("sent: ");
}
MessageConsumer consumer = session.createConsumer(dest);
System.out.println("receiving messages...");
for (int i = 0; i < num; i++) {
TextMessage msg = (TextMessage) consumer.receive(5000);
System.out.println("received: " + msg);
assertNotNull(msg);
String content = msg.getText();
System.out.println("content: " + content);
assertEquals(msgBase + i, content);
}
assertNull(consumer.receive(1000));
session.close();
looseSession.close();
} finally {
looseConn.close();
}
}
// @Test -- ignored for now
public void testKeepAlive() throws Exception {
connection.start();