diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index fe810ff551..71a26e0b4b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -58,6 +58,7 @@ import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LongSequenceGenerator; +import org.apache.qpid.proton.ProtonFactoryLoader; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; @@ -98,7 +99,8 @@ import org.apache.qpid.proton.jms.EncodedMessage; import org.apache.qpid.proton.jms.InboundTransformer; import org.apache.qpid.proton.jms.JMSMappingInboundTransformer; import org.apache.qpid.proton.jms.OutboundTransformer; -import org.apache.qpid.proton.message.impl.MessageImpl; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.MessageFactory; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.ByteArrayOutputStream; import org.slf4j.Logger; @@ -120,11 +122,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); + private static final ProtonFactoryLoader messageFactoryLoader = + new ProtonFactoryLoader(MessageFactory.class); + int prefetch = 100; EngineFactory engineFactory = new EngineFactoryImpl(); Transport protonTransport = engineFactory.createTransport(); Connection protonConnection = engineFactory.createConnection(); + MessageFactory messageFactory = messageFactoryLoader.loadFactory(); public AmqpProtocolConverter(AmqpTransport transport) { this.amqpTransport = transport; @@ -630,7 +636,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { @Override protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception { - MessageImpl msg = new MessageImpl(); + Message msg = messageFactory.createMessage(); int offset = buffer.offset; int len = buffer.length; while (len > 0) {