ARTEMIS-1169 Fixing protocol conversion
this will fix a few multiple protocol tests on ConsumerTests. And a few other AMQP tests dealing with conversions. You would get a classCastException without this commit.
This commit is contained in:
parent
a05e36ece1
commit
830c3bf179
|
@ -567,8 +567,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
|
||||
|
||||
try {
|
||||
invokeOutgoing((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
|
||||
return plugSender.deliverMessage(ref, deliveryCount);
|
||||
return plugSender.deliverMessage(ref, deliveryCount, transportConnection);
|
||||
} catch (Exception e) {
|
||||
connection.lock();
|
||||
try {
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
|
|||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
|
||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
|
||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransa
|
|||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
import org.apache.activemq.artemis.selector.filter.FilterException;
|
||||
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.qpid.proton.amqp.DescribedType;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
|
@ -657,13 +659,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
/**
|
||||
* handle an out going message from ActiveMQ Artemis, send via the Proton Sender
|
||||
*/
|
||||
public int deliverMessage(MessageReference messageReference, int deliveryCount) throws Exception {
|
||||
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
|
||||
public int deliverMessage(MessageReference messageReference, int deliveryCount, Connection transportConnection) throws Exception {
|
||||
|
||||
if (closed) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
|
||||
sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
|
||||
|
||||
// presettle means we can settle the message on the dealer side before we send it, i.e.
|
||||
// for browsers
|
||||
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
|
||||
|
|
Loading…
Reference in New Issue