diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index df8509d62a..186d79091b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -58,7 +58,7 @@ import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LongSequenceGenerator; -import org.apache.qpid.proton.ProtonFactoryLoader; +import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; @@ -82,8 +82,8 @@ import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Event.Type.*; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sasl; @@ -91,7 +91,6 @@ import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.CollectorImpl; -import org.apache.qpid.proton.engine.impl.EngineFactoryImpl; import org.apache.qpid.proton.engine.impl.ProtocolTracer; import org.apache.qpid.proton.engine.impl.TransportImpl; import org.apache.qpid.proton.framing.TransportFrame; @@ -103,7 +102,6 @@ import org.apache.qpid.proton.jms.InboundTransformer; import org.apache.qpid.proton.jms.JMSMappingInboundTransformer; import org.apache.qpid.proton.jms.OutboundTransformer; import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.MessageFactory; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.ByteArrayOutputStream; import org.slf4j.Logger; @@ -120,13 +118,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); - private static final ProtonFactoryLoader messageFactoryLoader = new ProtonFactoryLoader(MessageFactory.class); - protected int prefetch = 100; - protected EngineFactory engineFactory = new EngineFactoryImpl(); - protected Transport protonTransport = engineFactory.createTransport(); - protected Connection protonConnection = engineFactory.createConnection(); - protected MessageFactory messageFactory = messageFactoryLoader.loadFactory(); + protected Transport protonTransport = Proton.transport(); + protected Connection protonConnection = Proton.connection(); protected Collector eventCollector = new CollectorImpl(); public AmqpProtocolConverter(AmqpTransport transport) { @@ -266,13 +260,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { Event event = null; while ((event = eventCollector.peek()) != null) { switch (event.getType()) { - case CONNECTION_REMOTE_STATE: + case CONNECTION_REMOTE_OPEN: + case CONNECTION_REMOTE_CLOSE: processConnectionEvent(event.getConnection()); break; - case SESSION_REMOTE_STATE: + case SESSION_REMOTE_OPEN: + case SESSION_REMOTE_CLOSE: processSessionEvent(event.getSession()); break; - case LINK_REMOTE_STATE: + case LINK_REMOTE_OPEN: + case LINK_REMOTE_CLOSE: processLinkEvent(event.getLink()); break; case LINK_FLOW: @@ -697,7 +694,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { @Override protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception { - Message msg = messageFactory.createMessage(); + Message msg = Proton.message(); int offset = buffer.offset; int len = buffer.length; while (len > 0) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java index 191ae07e0e..2f68fa33fe 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java @@ -46,7 +46,6 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Ignore("Until https://issues.apache.org/jira/browse/PROTON-588 and https://issues.apache.org/jira/browse/QPID-5792 are fixed") public class JmsClientRequestResponseTest extends AmqpTestSupport implements MessageListener { private static final Logger LOG = LoggerFactory.getLogger(JmsClientRequestResponseTest.class); diff --git a/pom.xml b/pom.xml index e685aee8d4..203d56810b 100755 --- a/pom.xml +++ b/pom.xml @@ -100,7 +100,7 @@ 0.4.0 1.4.0 3.4.5 - 0.7 + 1.0-SNAPSHOT 0.26 1.3 1.0