diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml
new file mode 100644
index 0000000000..d7d7354e69
--- /dev/null
+++ b/activemq-amqp/pom.xml
@@ -0,0 +1,130 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.activemq
+ activemq-parent
+ 5.8-SNAPSHOT
+
+
+ activemq-amqp
+ jar
+
+ ActiveMQ :: AMQP
+ ActiveMQ implementaiton of AMQP messaging protocol
+
+
+
+
+ org.apache.activemq
+ activemq-core
+ provided
+
+
+
+ org.apache.qpid
+ qpid-proton
+ 1.0-SNAPSHOT
+
+
+
+ org.fusesource.hawtbuf
+ hawtbuf
+ ${hawtbuf-version}
+
+
+
+
+ org.apache.activemq
+ activemq-core
+ test-jar
+ test
+
+
+ org.apache.activemq
+ activemq-console
+ test
+
+
+
+ junit
+ junit
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+ test
+
+
+
+
+
+
+
+ swiftmq-client
+
+ swiftmq-client-home
+
+
+
+ com.swiftmq
+ swiftmq
+ 9.2.0
+ system
+ ${swiftmq-client-home}/jars/swiftmq.jar
+
+
+ com.swiftmq
+ swiftmq-amqp
+ 9.2.0
+ system
+ ${swiftmq-client-home}/jars/amqp.jar
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 1.7
+
+
+ add-test-source
+ generate-sources
+
+ add-test-source
+
+
+
+ ${basedir}/src/test-swiftmq/java
+
+
+
+
+
+
+
+
+
+
+
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
new file mode 100644
index 0000000000..994319b030
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+/**
+* @author Hiram Chirino
+*/
+public class AMQPNativeInboundTransformer extends InboundTransformer {
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
new file mode 100644
index 0000000000..91b5dc952b
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.SslTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A AMQP over SSL transport factory
+ */
+public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
+
+ private BrokerContext brokerContext = null;
+
+ protected String getDefaultWireFormatType() {
+ return "amqp";
+ }
+
+ @SuppressWarnings("rawtypes")
+
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ transport = new AmqpTransportFilter(transport, format, brokerContext);
+ IntrospectionSupport.setProperties(transport, options);
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+ transport = super.serverConfigure(transport, format, options);
+
+ MutexTransport mutex = transport.narrow(MutexTransport.class);
+ if (mutex != null) {
+ mutex.setSyncOnCommand(true);
+ }
+
+ return transport;
+ }
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerContext = brokerService.getBrokerContext();
+ }
+
+// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+//
+// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+// filter.setInactivityMonitor(monitor);
+//
+// return monitor;
+// }
+
+
+ @Override
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ return false;
+ }
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
new file mode 100644
index 0000000000..398d75a097
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+public class AmqpNioSslTransport extends NIOSSLTransport {
+
+ public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+ super(wireFormat, socketFactory, remoteLocation, localLocation);
+ }
+
+ public AmqpNioSslTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ super(wireFormat, socket);
+ }
+
+ @Override
+ protected void initializeStreams() throws IOException {
+ super.initializeStreams();
+ if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
+ serviceRead();
+ }
+ }
+
+ @Override
+ protected void processCommand(ByteBuffer plain) throws Exception {
+ doConsume(AmqpSupport.toBuffer(plain));
+ }
+
+}
\ No newline at end of file
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
new file mode 100644
index 0000000000..5130cb8daf
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory {
+
+ SSLContext context;
+
+ @Override
+ protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ return new TcpTransportServer(this, location, serverSocketFactory) {
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ AmqpNioSslTransport transport = new AmqpNioSslTransport(format, socket);
+ if (context != null) {
+ transport.setSslContext(context);
+ }
+ return transport;
+ }
+ };
+ }
+
+ @Override
+ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+ return new AmqpNioSslTransport(wf, socketFactory, location, localLocation);
+ }
+
+ @Override
+ public TransportServer doBind(URI location) throws IOException {
+ if (SslContext.getCurrentSslContext() != null) {
+ try {
+ context = SslContext.getCurrentSslContext().getSSLContext();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ return super.doBind(location);
+ }
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
new file mode 100644
index 0000000000..6f4550ff5f
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+
+import javax.net.SocketFactory;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+/**
+ * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
+ */
+public class AmqpNioTransport extends TcpTransport {
+
+ private SocketChannel channel;
+ private SelectorSelection selection;
+
+ private ByteBuffer inputBuffer;
+
+ public AmqpNioTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+ super(wireFormat, socketFactory, remoteLocation, localLocation);
+ }
+
+ public AmqpNioTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ super(wireFormat, socket);
+ }
+
+ protected void initializeStreams() throws IOException {
+ channel = socket.getChannel();
+ channel.configureBlocking(false);
+ // listen for events telling us when the socket is readable.
+ selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
+ public void onSelect(SelectorSelection selection) {
+ if (!isStopped()) {
+ serviceRead();
+ }
+ }
+
+ public void onError(SelectorSelection selection, Throwable error) {
+ if (error instanceof IOException) {
+ onException((IOException) error);
+ } else {
+ onException(IOExceptionSupport.create(error));
+ }
+ }
+ });
+
+ inputBuffer = ByteBuffer.allocate(8 * 1024);
+ NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
+ this.dataOut = new DataOutputStream(outPutStream);
+ this.buffOut = outPutStream;
+ }
+
+ private void serviceRead() {
+ try {
+
+ while (isStarted()) {
+ // read channel
+ int readSize = channel.read(inputBuffer);
+ // channel is closed, cleanup
+ if (readSize == -1) {
+ onException(new EOFException());
+ selection.close();
+ break;
+ }
+ // nothing more to read, break
+ if (readSize == 0) {
+ break;
+ }
+
+ inputBuffer.flip();
+ doConsume(AmqpSupport.toBuffer(inputBuffer));
+ // clear the buffer
+ inputBuffer.clear();
+
+ }
+ } catch (IOException e) {
+ onException(e);
+ } catch (Throwable e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+ protected void doStart() throws Exception {
+ connect();
+ selection.setInterestOps(SelectionKey.OP_READ);
+ selection.enable();
+ }
+
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ try {
+ if (selection != null) {
+ selection.close();
+ }
+ } finally {
+ super.doStop(stopper);
+ }
+ }
+}
\ No newline at end of file
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
new file mode 100644
index 0000000000..e289dc1396
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A AMQP over NIO transport factory
+ */
+public class AmqpNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
+
+ private BrokerContext brokerContext = null;
+
+ protected String getDefaultWireFormatType() {
+ return "amqp";
+ }
+
+ protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ return new TcpTransportServer(this, location, serverSocketFactory) {
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ return new AmqpNioTransport(format, socket);
+ }
+ };
+ }
+
+ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+ return new AmqpNioTransport(wf, socketFactory, location, localLocation);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+ transport = super.serverConfigure(transport, format, options);
+
+ MutexTransport mutex = transport.narrow(MutexTransport.class);
+ if (mutex != null) {
+ mutex.setSyncOnCommand(true);
+ }
+
+ return transport;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ transport = new AmqpTransportFilter(transport, format, brokerContext);
+ IntrospectionSupport.setProperties(transport, options);
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerContext = brokerService.getBrokerContext();
+ }
+
+// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+// filter.setInactivityMonitor(monitor);
+// return monitor;
+// }
+
+ @Override
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ return false;
+ }
+}
+
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
new file mode 100644
index 0000000000..82a3b96d13
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -0,0 +1,750 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.command.*;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.util.*;
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.Inflater;
+
+class AmqpProtocolConverter {
+
+ public static final EnumSet UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
+ public static final EnumSet INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
+ public static final EnumSet ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
+ public static final EnumSet CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
+
+ private final AmqpTransport amqpTransport;
+
+ public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
+ this.amqpTransport = amqpTransport;
+ }
+
+//
+// private static final Buffer PING_RESP_FRAME = new PINGRESP().encode();
+//
+//
+// private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+// private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+//
+// private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap();
+// private final ConcurrentHashMap amqpSubscriptionByTopic = new ConcurrentHashMap();
+// private final Map activeMQTopicMap = new LRUCache();
+// private final Map amqpTopicMap = new LRUCache();
+// private final Map consumerAcks = new LRUCache();
+// private final Map publisherRecs = new LRUCache();
+//
+// private final AtomicBoolean connected = new AtomicBoolean(false);
+// private CONNECT connect;
+// private String clientId;
+// private final String QOS_PROPERTY_NAME = "QoSPropertyName";
+
+
+ TransportImpl protonTransport = new TransportImpl();
+ ConnectionImpl protonConnection = new ConnectionImpl();
+
+ {
+ this.protonTransport.bind(this.protonConnection);
+ }
+
+ void pumpOut() {
+ try {
+ int size = 1024 * 64;
+ byte data[] = new byte[size];
+ boolean done = false;
+ while (!done) {
+ int count = protonTransport.output(data, 0, size);
+ if (count > 0) {
+ final Buffer buffer = new Buffer(data, 0, count);
+ System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
+ amqpTransport.sendToAmqp(buffer);
+ } else {
+ done = true;
+ }
+ }
+// System.out.println("write done");
+ } catch (IOException e) {
+ amqpTransport.onException(e);
+ }
+ }
+
+ static class AmqpSessionContext {
+ private final SessionId sessionId;
+ long nextProducerId = 0;
+ long nextConsumerId = 0;
+
+ public AmqpSessionContext(ConnectionId connectionId, long id) {
+ sessionId = new SessionId(connectionId, -1);
+
+ }
+ }
+
+ /**
+ * Convert a AMQP command
+ */
+ public void onAMQPData(Buffer frame) throws IOException, JMSException {
+
+
+ try {
+ System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
+ protonTransport.input(frame.data, frame.offset, frame.length);
+ } catch (Throwable e) {
+ handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
+ }
+
+ try {
+
+ // Handle the amqp open..
+ if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
+ onConnectionOpen();
+ }
+
+ // Lets map amqp sessions to openwire sessions..
+ Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ while (session != null) {
+
+ onSessionOpen(session);
+ session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ }
+
+
+ Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ while (link != null) {
+ onLinkOpen(link);
+ link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ }
+
+ Delivery delivery = protonConnection.getWorkHead();
+ while (delivery != null) {
+ AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
+ if (listener != null) {
+ listener.onDelivery(delivery);
+ }
+ delivery = delivery.getWorkNext();
+ }
+
+ link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
+ while (link != null) {
+ if (link instanceof Receiver) {
+// listener.onReceiverClose((Receiver) link);
+ } else {
+// listener.onSenderClose((Sender) link);
+ }
+ link.close();
+ link = link.next(ACTIVE_STATE, CLOSED_STATE);
+ }
+
+ session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
+ while (session != null) {
+ //TODO - close links?
+// listener.onSessionClose(session);
+ session.close();
+ session = session.next(ACTIVE_STATE, CLOSED_STATE);
+ }
+ if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
+// listener.onConnectionClose(protonConnection);
+ protonConnection.close();
+ }
+
+ } catch (Throwable e) {
+ handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
+ }
+
+ pumpOut();
+ }
+
+ public void onActiveMQCommand(Command command) throws Exception {
+ if (command.isResponse()) {
+ Response response = (Response) command;
+ ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
+ if (rh != null) {
+ rh.onResponse(this, response);
+ } else {
+ // Pass down any unexpected errors. Should this close the connection?
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse) response).getException();
+ handleException(exception);
+ }
+ }
+ } else if (command.isMessageDispatch()) {
+ MessageDispatch md = (MessageDispatch) command;
+ ConsumerContext consumerContext = subscriptionsByConsumerId.get(md.getConsumerId());
+ if (consumerContext != null) {
+ consumerContext.onMessageDispatch(md);
+ }
+ } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
+ // Pass down any unexpected async errors. Should this close the connection?
+ Throwable exception = ((ConnectionError) command).getException();
+ handleException(exception);
+ } else if (command.isBrokerInfo()) {
+ //ignore
+ } else {
+ LOG.debug("Do not know how to process ActiveMQ Command " + command);
+ }
+ }
+
+ private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+ private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
+ private ConnectionInfo connectionInfo = new ConnectionInfo();
+ private long nextSessionId = 0;
+
+ static abstract class AmqpDeliveryListener {
+ abstract public void onDelivery(Delivery delivery) throws Exception;
+ }
+
+ private void onConnectionOpen() throws AmqpProtocolException {
+ connectionInfo.setResponseRequired(true);
+ connectionInfo.setConnectionId(connectionId);
+// configureInactivityMonitor(connect.keepAlive());
+
+ String clientId = protonConnection.getRemoteContainer();
+ if (clientId != null && !clientId.isEmpty()) {
+ connectionInfo.setClientId(clientId);
+ } else {
+ connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
+ }
+
+
+// String userName = "";
+// if (connect.userName() != null) {
+// userName = connect.userName().toString();
+// }
+// String passswd = "";
+// if (connect.password() != null) {
+// passswd = connect.password().toString();
+// }
+// connectionInfo.setUserName(userName);
+// connectionInfo.setPassword(passswd);
+
+ connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
+
+ sendToActiveMQ(connectionInfo, new ResponseHandler() {
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+
+ protonConnection.open();
+ pumpOut();
+
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse) response).getException();
+// TODO: figure out how to close /w an error.
+// protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
+ protonConnection.close();
+ pumpOut();
+ amqpTransport.onException(IOExceptionSupport.create(exception));
+ return;
+ }
+
+ }
+ });
+ }
+
+ private void onSessionOpen(Session session) {
+ AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++);
+ session.setContext(sessionContext);
+ sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null);
+ session.open();
+ }
+
+ private void onLinkOpen(Link link) {
+ link.setLocalSourceAddress(link.getRemoteSourceAddress());
+ link.setLocalTargetAddress(link.getRemoteTargetAddress());
+
+ AmqpSessionContext sessionContext = (AmqpSessionContext) link.getSession().getContext();
+ if (link instanceof Receiver) {
+ onReceiverOpen((Receiver) link, sessionContext);
+ } else {
+ onSenderOpen((Sender) link, sessionContext);
+ }
+ }
+
+ class ProducerContext extends AmqpDeliveryListener {
+ private final ProducerId producerId;
+ private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+ private final ActiveMQDestination destination;
+
+ public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
+ this.producerId = producerId;
+ this.destination = destination;
+ }
+
+ @Override
+ public void onDelivery(Delivery delivery) throws JMSException {
+// delivery.
+ ActiveMQMessage message = convertMessage((DeliveryImpl) delivery);
+ message.setProducerId(producerId);
+ message.onSend();
+// sendToActiveMQ(message, createResponseHandler(command));
+ sendToActiveMQ(message, null);
+ }
+
+ ActiveMQMessage convertMessage(DeliveryImpl delivery) throws JMSException {
+ ActiveMQBytesMessage msg = nextMessage(delivery);
+ final Receiver receiver = (Receiver) delivery.getLink();
+ byte buff[] = new byte[1024 * 4];
+ int count = 0;
+ while ((count = receiver.recv(buff, 0, buff.length)) >= 0) {
+ msg.writeBytes(buff, 0, count);
+ }
+ return msg;
+ }
+
+ ActiveMQBytesMessage current;
+
+ private ActiveMQBytesMessage nextMessage(DeliveryImpl delivery) throws JMSException {
+ if (current == null) {
+ current = new ActiveMQBytesMessage();
+ current.setJMSDestination(destination);
+ current.setProducerId(producerId);
+ current.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
+ current.setTimestamp(System.currentTimeMillis());
+ current.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
+// msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
+// msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
+ System.out.println(delivery.getLocalState() + "/" + delivery.getRemoteState());
+ }
+ return current;
+ }
+
+ }
+
+
+ void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
+ // Client is producing to this receiver object
+
+ ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
+ ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+ ProducerContext producerContext = new ProducerContext(producerId, destination);
+
+ receiver.setContext(producerContext);
+ receiver.flow(1024 * 64);
+ ProducerInfo producerInfo = new ProducerInfo(producerId);
+ producerInfo.setDestination(destination);
+ sendToActiveMQ(producerInfo, new ResponseHandler() {
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ receiver.open();
+ if (response.isException()) {
+ // If the connection attempt fails we close the socket.
+ Throwable exception = ((ExceptionResponse) response).getException();
+ receiver.close();
+ }
+ pumpOut();
+ }
+ });
+
+ }
+
+
+ class ConsumerContext extends AmqpDeliveryListener {
+ private final ConsumerId consumerId;
+ private final Sender sender;
+
+ long nextTagId = 0;
+ HashSet tagCache = new HashSet();
+
+ byte[] nextTag() {
+ byte[] rc;
+ if (tagCache != null && !tagCache.isEmpty()) {
+ final Iterator iterator = tagCache.iterator();
+ rc = iterator.next();
+ iterator.remove();
+ } else {
+ try {
+ rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return rc;
+ }
+
+ public ConsumerContext(ConsumerId consumerId, Sender sender) {
+ this.consumerId = consumerId;
+ this.sender = sender;
+ }
+
+ // called when the connection receives a JMS message from ActiveMQ
+ public void onMessageDispatch(MessageDispatch md) throws Exception {
+ final byte[] tag = nextTag();
+ final Delivery delivery = sender.delivery(tag, 0, tag.length);
+ delivery.setContext(md);
+
+ // Covert to an AMQP messages.
+ org.apache.qpid.proton.message.Message msg = convertMessage(md.getMessage());
+ byte buffer[] = new byte[1024*4];
+ int c=0;
+
+ // And send the AMQP message over the link.
+ while( (c=msg.encode(buffer, 0 , 0)) >= 0 ) {
+ sender.send(buffer, 0, c);
+ }
+ sender.advance();
+
+ }
+
+ public org.apache.qpid.proton.message.Message convertMessage(Message message) throws Exception {
+// result.setContentEncoding();
+// QoS qoS;
+// if (message.propertyExists(QOS_PROPERTY_NAME)) {
+// int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
+// qoS = QoS.values()[ordinal];
+//
+// } else {
+// qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
+// }
+// result.qos(qoS);
+
+ Buffer content = null;
+ if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+ ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
+ msg.setReadOnlyBody(true);
+ String messageText = msg.getText();
+ content = new Buffer(messageText.getBytes("UTF-8"));
+ } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
+ ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
+ msg.setReadOnlyBody(true);
+ byte[] data = new byte[(int) msg.getBodyLength()];
+ msg.readBytes(data);
+ content = new Buffer(data);
+ } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
+ ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+ msg.setReadOnlyBody(true);
+ Map map = msg.getContentMap();
+ content = new Buffer(map.toString().getBytes("UTF-8"));
+ } else {
+ ByteSequence byteSequence = message.getContent();
+ if (byteSequence != null && byteSequence.getLength() > 0) {
+ if (message.isCompressed()) {
+ Inflater inflater = new Inflater();
+ inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
+ byte[] data = new byte[4096];
+ int read;
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ while ((read = inflater.inflate(data)) != 0) {
+ bytesOut.write(data, 0, read);
+ }
+ byteSequence = bytesOut.toByteSequence();
+ }
+ content = new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length);
+ } else {
+ content = new Buffer(0);
+ }
+ }
+
+ org.apache.qpid.proton.message.Message result = new org.apache.qpid.proton.message.Message();
+ return result;
+ }
+
+
+ @Override
+ public void onDelivery(Delivery delivery) throws JMSException {
+ if( delivery.remotelySettled() ) {
+ MessageDispatch md = (MessageDispatch) delivery.getContext();
+ }
+ }
+
+ }
+
+ private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap();
+
+ void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
+
+ ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
+ ConsumerContext consumerContext = new ConsumerContext(id, sender);
+
+ subscriptionsByConsumerId.put(id, consumerContext);
+
+ ActiveMQDestination destination = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+
+ sender.setContext(consumerContext);
+ ConsumerInfo consumerInfo = new ConsumerInfo(id);
+ consumerInfo.setDestination(destination);
+ consumerInfo.setPrefetchSize(100);
+ consumerInfo.setDispatchAsync(true);
+
+ sendToActiveMQ(consumerInfo, new ResponseHandler() {
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ sender.open();
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse) response).getException();
+ sender.close();
+ }
+ pumpOut();
+ }
+ });
+
+ }
+
+//
+// QoS onSubscribe(SUBSCRIBE command, Topic topic) throws AmqpProtocolException {
+// ActiveMQDestination destination = new ActiveMQTopic(convertAMQPToActiveMQ(topic.name().toString()));
+// if (destination == null) {
+// throw new AmqpProtocolException("Invalid Destination.");
+// }
+//
+// ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+// ConsumerInfo consumerInfo = new ConsumerInfo(id);
+// consumerInfo.setDestination(destination);
+// consumerInfo.setPrefetchSize(1000);
+// consumerInfo.setDispatchAsync(true);
+// if (!connect.cleanSession() && (connect.clientId() != null)) {
+// //by default subscribers are persistent
+// consumerInfo.setSubscriptionName(connect.clientId().toString());
+// }
+//
+// AmqpSubscription amqpSubscription = new AmqpSubscription(this, topic.qos(), consumerInfo);
+//
+//
+// amqpSubscriptionByTopic.put(topic.name(), amqpSubscription);
+//
+// sendToActiveMQ(consumerInfo, null);
+// return topic.qos();
+// }
+//
+// void onUnSubscribe(UNSUBSCRIBE command) {
+// UTF8Buffer[] topics = command.topics();
+// if (topics != null) {
+// for (int i = 0; i < topics.length; i++) {
+// onUnSubscribe(topics[i]);
+// }
+// }
+// UNSUBACK ack = new UNSUBACK();
+// ack.messageId(command.messageId());
+// pumpOut(ack.encode());
+//
+// }
+//
+// void onUnSubscribe(UTF8Buffer topicName) {
+// AmqpSubscription subs = amqpSubscriptionByTopic.remove(topicName);
+// if (subs != null) {
+// ConsumerInfo info = subs.getConsumerInfo();
+// if (info != null) {
+// subscriptionsByConsumerId.remove(info.getConsumerId());
+// }
+// RemoveInfo removeInfo = info.createRemoveCommand();
+// sendToActiveMQ(removeInfo, null);
+// }
+// }
+//
+//
+// /**
+// * Dispatch a ActiveMQ command
+// */
+//
+//
+//
+// void onAMQPPublish(PUBLISH command) throws IOException, JMSException {
+// checkConnected();
+// }
+//
+// void onAMQPPubAck(PUBACK command) {
+// short messageId = command.messageId();
+// MessageAck ack;
+// synchronized (consumerAcks) {
+// ack = consumerAcks.remove(messageId);
+// }
+// if (ack != null) {
+// amqpTransport.sendToActiveMQ(ack);
+// }
+// }
+//
+// void onAMQPPubRec(PUBREC commnand) {
+// //from a subscriber - send a PUBREL in response
+// PUBREL pubrel = new PUBREL();
+// pubrel.messageId(commnand.messageId());
+// pumpOut(pubrel.encode());
+// }
+//
+// void onAMQPPubRel(PUBREL command) {
+// PUBREC ack;
+// synchronized (publisherRecs) {
+// ack = publisherRecs.remove(command.messageId());
+// }
+// if (ack == null) {
+// LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
+// }
+// PUBCOMP pubcomp = new PUBCOMP();
+// pubcomp.messageId(command.messageId());
+// pumpOut(pubcomp.encode());
+// }
+//
+// void onAMQPPubComp(PUBCOMP command) {
+// short messageId = command.messageId();
+// MessageAck ack;
+// synchronized (consumerAcks) {
+// ack = consumerAcks.remove(messageId);
+// }
+// if (ack != null) {
+// amqpTransport.sendToActiveMQ(ack);
+// }
+// }
+//
+//
+//
+//
+// public AmqpTransport amqpTransport {
+// return amqpTransport;
+// }
+//
+//
+//
+// void configureInactivityMonitor(short heartBeat) {
+// try {
+//
+// int heartBeatMS = heartBeat * 1000;
+// AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
+// monitor.setProtocolConverter(this);
+// monitor.setReadCheckTime(heartBeatMS);
+// monitor.setInitialDelayTime(heartBeatMS);
+// monitor.startMonitorThread();
+//
+// } catch (Exception ex) {
+// LOG.warn("Failed to start AMQP InactivityMonitor ", ex);
+// }
+//
+// LOG.debug(getClientId() + " AMQP Connection using heart beat of " + heartBeat + " secs");
+// }
+//
+//
+//
+// void checkConnected() throws AmqpProtocolException {
+// if (!connected.get()) {
+// throw new AmqpProtocolException("Not connected.");
+// }
+// }
+//
+// private String getClientId() {
+// if (clientId == null) {
+// if (connect != null && connect.clientId() != null) {
+// clientId = connect.clientId().toString();
+// }
+// } else {
+// clientId = "";
+// }
+// return clientId;
+// }
+//
+// private void stopTransport() {
+// try {
+// amqpTransport.stop();
+// } catch (Throwable e) {
+// LOG.debug("Failed to stop AMQP transport ", e);
+// }
+// }
+//
+// ResponseHandler createResponseHandler(final PUBLISH command) {
+//
+// if (command != null) {
+// switch (command.qos()) {
+// case AT_LEAST_ONCE:
+// return new ResponseHandler() {
+// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+// if (response.isException()) {
+// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
+// } else {
+// PUBACK ack = new PUBACK();
+// ack.messageId(command.messageId());
+// converter.amqpTransport.sendToAmqp(ack.encode());
+// }
+// }
+// };
+// case EXACTLY_ONCE:
+// return new ResponseHandler() {
+// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+// if (response.isException()) {
+// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
+// } else {
+// PUBREC ack = new PUBREC();
+// ack.messageId(command.messageId());
+// synchronized (publisherRecs) {
+// publisherRecs.put(command.messageId(), ack);
+// }
+// converter.amqpTransport.sendToAmqp(ack.encode());
+// }
+// }
+// };
+// case AT_MOST_ONCE:
+// break;
+// }
+// }
+// return null;
+// }
+//
+// private String convertAMQPToActiveMQ(String name) {
+// String result = name.replace('#', '>');
+// result = result.replace('+', '*');
+// result = result.replace('/', '.');
+// return result;
+// }
+
+ ////////////////////////////////////////////////////////////////////////////
+ //
+ // Implementation methods
+ //
+ ////////////////////////////////////////////////////////////////////////////
+
+ private final Object commnadIdMutex = new Object();
+ private int lastCommandId;
+
+ int generateCommandId() {
+ synchronized (commnadIdMutex) {
+ return lastCommandId++;
+ }
+ }
+
+ private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap();
+
+ void sendToActiveMQ(Command command, ResponseHandler handler) {
+ command.setCommandId(generateCommandId());
+ if (handler != null) {
+ command.setResponseRequired(true);
+ resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+ }
+ amqpTransport.sendToActiveMQ(command);
+ }
+
+ void handleException(Throwable exception) {
+ exception.printStackTrace();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception detail", exception);
+ }
+ try {
+ amqpTransport.stop();
+ } catch (Throwable e) {
+ LOG.error("Failed to stop AMQP Transport ", e);
+ }
+ }
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
new file mode 100644
index 0000000000..852d16b65d
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+
+
+public class AmqpProtocolException extends IOException {
+
+ private static final long serialVersionUID = -2869735532997332242L;
+
+ private final boolean fatal;
+
+ public AmqpProtocolException() {
+ this(null);
+ }
+
+ public AmqpProtocolException(String s) {
+ this(s, false);
+ }
+
+ public AmqpProtocolException(String s, boolean fatal) {
+ this(s, fatal, null);
+ }
+
+ public AmqpProtocolException(String s, boolean fatal, Throwable cause) {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal() {
+ return fatal;
+ }
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java
new file mode 100644
index 0000000000..a68ee00e70
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.*;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+
+/**
+ * Keeps track of the AMQP client subscription so that acking is correctly done.
+ */
+class AmqpSubscription {
+// private final AmqpProtocolConverter protocolConverter;
+//
+// private final ConsumerInfo consumerInfo;
+// private ActiveMQDestination destination;
+// private final QoS qos;
+//
+// public AmqpSubscription(AmqpProtocolConverter protocolConverter, QoS qos, ConsumerInfo consumerInfo) {
+// this.protocolConverter = protocolConverter;
+// this.consumerInfo = consumerInfo;
+// this.qos = qos;
+// }
+//
+// MessageAck createMessageAck(MessageDispatch md) {
+// return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+// }
+//
+// PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException {
+// PUBLISH publish = protocolConverter.convertMessage(message);
+// if (publish.qos().ordinal() > this.qos.ordinal()) {
+// publish.qos(this.qos);
+// }
+// return publish;
+// }
+//
+// public boolean expectAck() {
+// return qos != QoS.AT_MOST_ONCE;
+// }
+//
+// public void setDestination(ActiveMQDestination destination) {
+// this.destination = destination;
+// }
+//
+// public ActiveMQDestination getDestination() {
+// return destination;
+// }
+//
+// public ConsumerInfo getConsumerInfo() {
+// return consumerInfo;
+// }
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
new file mode 100644
index 0000000000..4e992b751c
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.fusesource.hawtbuf.Buffer;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author Hiram Chirino
+ */
+public class AmqpSupport {
+
+ static public Buffer toBuffer(ByteBuffer data) {
+ if( data == null ) {
+ return null;
+ }
+ Buffer rc;
+ if( data.isDirect() ) {
+ rc = new Buffer(data.remaining());
+ data.get(rc.data);
+ } else {
+ rc = new Buffer(data);
+ data.position(data.position()+data.remaining());
+ }
+ return rc;
+ }
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
new file mode 100644
index 0000000000..a8ceaaa3a2
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.Command;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+
+/**
+ * Basic interface that mediates between protocol converter and transport
+ */
+public interface AmqpTransport {
+
+ public void sendToActiveMQ(Command command);
+
+ public void sendToAmqp(Buffer command) throws IOException;
+
+ public X509Certificate[] getPeerCertificates();
+
+ public void onException(IOException error);
+
+// public AmqpInactivityMonitor getInactivityMonitor();
+
+ public AmqpWireFormat getWireFormat();
+
+ public void stop() throws Exception;
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
new file mode 100644
index 0000000000..7fe4ae590b
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A AMQP transport factory
+ */
+public class AmqpTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
+
+ private BrokerContext brokerContext = null;
+
+ protected String getDefaultWireFormatType() {
+ return "amqp";
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ transport = new AmqpTransportFilter(transport, format, brokerContext);
+ IntrospectionSupport.setProperties(transport, options);
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerContext = brokerService.getBrokerContext();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+ transport = super.serverConfigure(transport, format, options);
+
+ MutexTransport mutex = transport.narrow(MutexTransport.class);
+ if (mutex != null) {
+ mutex.setSyncOnCommand(true);
+ }
+
+ return transport;
+ }
+
+// @Override
+// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+//
+// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+// filter.setInactivityMonitor(monitor);
+//
+// return monitor;
+// }
+
+ @Override
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ return false;
+ }
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
new file mode 100644
index 0000000000..1230bd3e9e
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.tcp.SslTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+
+/**
+ * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
+ * configured with the StompWireFormat and is used to convert AMQP commands to
+ * ActiveMQ commands. All of the conversion work is done by delegating to the
+ * AMQPProtocolConverter
+ */
+public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
+ private static final Logger TRACE = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".AMQPIO");
+ private final AmqpProtocolConverter protocolConverter;
+// private AmqpInactivityMonitor monitor;
+ private AmqpWireFormat wireFormat;
+
+ private boolean trace;
+
+ public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
+ super(next);
+ this.protocolConverter = new AmqpProtocolConverter(this, brokerContext);
+
+ if (wireFormat instanceof AmqpWireFormat) {
+ this.wireFormat = (AmqpWireFormat) wireFormat;
+ }
+ }
+
+ public void oneway(Object o) throws IOException {
+ try {
+ final Command command = (Command) o;
+ protocolConverter.onActiveMQCommand(command);
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ public void onCommand(Object command) {
+ try {
+ if (trace) {
+ TRACE.trace("Received: \n" + command);
+ }
+
+ protocolConverter.onAMQPData((Buffer) command);
+ } catch (IOException e) {
+ handleException(e);
+ } catch (JMSException e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+ public void sendToActiveMQ(Command command) {
+ TransportListener l = transportListener;
+ if (l != null) {
+ l.onCommand(command);
+ }
+ }
+
+ public void sendToAmqp(Buffer command) throws IOException {
+ if (trace) {
+ TRACE.trace("Sending: \n" + command);
+ }
+ Transport n = next;
+ if (n != null) {
+ n.oneway(command);
+ }
+ }
+
+ public X509Certificate[] getPeerCertificates() {
+ if (next instanceof SslTransport) {
+ X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
+ if (trace && peerCerts != null) {
+ LOG.debug("Peer Identity has been verified\n");
+ }
+ return peerCerts;
+ }
+ return null;
+ }
+
+ public boolean isTrace() {
+ return trace;
+ }
+
+ public void setTrace(boolean trace) {
+ this.trace = trace;
+ }
+
+// @Override
+// public AmqpInactivityMonitor getInactivityMonitor() {
+// return monitor;
+// }
+//
+// public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
+// this.monitor = monitor;
+// }
+
+ @Override
+ public AmqpWireFormat getWireFormat() {
+ return this.wireFormat;
+ }
+
+ public void handleException(IOException e) {
+ super.onException(e);
+ }
+
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
new file mode 100644
index 0000000000..c1456c1996
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.*;
+
+/**
+ */
+public class AmqpWireFormat implements WireFormat {
+
+
+ private int version = 1;
+ private long maxFrameLength = 1024*1024*100;
+
+ public ByteSequence marshal(Object command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toByteSequence();
+ }
+
+ public Object unmarshal(ByteSequence packet) throws IOException {
+ ByteArrayInputStream stream = new ByteArrayInputStream(packet);
+ DataInputStream dis = new DataInputStream(stream);
+ return unmarshal(dis);
+ }
+
+ public void marshal(Object command, DataOutput dataOut) throws IOException {
+ Buffer frame = (Buffer) command;
+ frame.writeTo(dataOut);
+ }
+
+ boolean magicRead = false;
+ public Object unmarshal(DataInput dataIn) throws IOException {
+ if( !magicRead ) {
+ Buffer magic = new Buffer(8);
+ magic.readFrom(dataIn);
+ magicRead = true;
+ return magic;
+ } else {
+ int size = dataIn.readInt();
+ if( size > maxFrameLength ) {
+ throw new AmqpProtocolException("Frame size exceeded max frame length.");
+ }
+ Buffer frame = new Buffer(size);
+ frame.bigEndianEditor().writeInt(size);
+ frame.readFrom(dataIn);
+ frame.clear();
+ return frame;
+ }
+ }
+
+ /**
+ */
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ /**
+ * @return the version of the wire format
+ */
+ public int getVersion() {
+ return this.version;
+ }
+
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
new file mode 100644
index 0000000000..11d40fcb9d
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that marshalls the Stomp protocol.
+ */
+public class AmqpWireFormatFactory implements WireFormatFactory {
+ public WireFormat createWireFormat() {
+ return new AmqpWireFormat();
+ }
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
new file mode 100644
index 0000000000..10344894b8
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+/**
+* @author Hiram Chirino
+*/
+public class InboundTransformer {
+
+ String prefixVendor = "JMS_AMQP_";
+ int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+ int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
+ long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java
new file mode 100644
index 0000000000..8626c17ae6
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+/**
+* @author Hiram Chirino
+*/
+public class JMSMappingInboundTransformer extends InboundTransformer {
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
new file mode 100644
index 0000000000..bf1bdae9bd
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.Response;
+
+import java.io.IOException;
+
+
+/**
+ * Interface used by the AMQPProtocolConverter for callbacks.
+ */
+interface ResponseHandler {
+ void onResponse(AmqpProtocolConverter converter, Response response) throws IOException;
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html
new file mode 100644
index 0000000000..42f206ae6c
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+A Broker side implementation of the AMQP 3.1 protocol - see http://amqp.org/
+
+
+
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp
new file mode 100644
index 0000000000..9e09d18969
--- /dev/null
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.amqp.AmqpTransportFactory
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio
new file mode 100644
index 0000000000..40bf30f95b
--- /dev/null
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.amqp.AmqpNioTransportFactory
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl
new file mode 100644
index 0000000000..25af0bd66a
--- /dev/null
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.amqp.AmqpNioSslTransportFactory
\ No newline at end of file
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
new file mode 100644
index 0000000000..fbcc95d3d3
--- /dev/null
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp
new file mode 100644
index 0000000000..b2990235cf
--- /dev/null
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.amqp.AmqpWireFormatFactory
\ No newline at end of file
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java
new file mode 100644
index 0000000000..859062fdc6
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerService;
+
+public class AmqpNioTest extends AmqpTest {
+ protected void addAMQPConnector(BrokerService brokerService) throws Exception {
+ brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1");
+ }
+
+}
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java
new file mode 100644
index 0000000000..443479e33a
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Ignore;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+@Ignore("hangs atm, needs investigation")
+public class AmqpSslTest extends AmqpTest {
+ public void startBroker() throws Exception {
+ System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ super.startBroker();
+ }
+
+ protected void addAMQPConnector(BrokerService brokerService) throws Exception {
+ brokerService.addConnector("amqp+ssl://localhost:8883");
+ }
+
+// protected AMQP createAMQPConnection() throws Exception {
+// AMQP amqp = new AMQP();
+// amqp.setHost("ssl://localhost:8883");
+// SSLContext ctx = SSLContext.getInstance("TLS");
+// ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+// amqp.setSslContext(ctx);
+// return amqp;
+// }
+
+ static class DefaultTrustManager implements X509TrustManager {
+
+ public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ }
+
+}
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java
new file mode 100644
index 0000000000..17afc37e88
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.AutoFailTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+
+public class AmqpTest {
+ protected static final Logger LOG = LoggerFactory.getLogger(AmqpTest.class);
+ protected BrokerService brokerService;
+ protected Vector exceptions = new Vector();
+ protected int numberOfMessages;
+ AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
+ protected int port;
+
+ @Before
+ public void startBroker() throws Exception {
+ autoFailTestSupport.startAutoFailThread();
+ exceptions.clear();
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setAdvisorySupport(false);
+ addAMQPConnector();
+ brokerService.start();
+ this.numberOfMessages = 2000;
+ }
+
+ protected void addAMQPConnector() throws Exception {
+ final TransportConnector connector = brokerService.addConnector("amqp://localhost:0");
+ port = connector.getConnectUri().getPort();
+ }
+
+
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService = null;
+ }
+ autoFailTestSupport.stopAutoFailThread();
+ }
+
+
+
+// @Test
+// public void testSendAndReceiveAMQP() throws Exception {
+// addAMQPConnector(brokerService);
+// brokerService.start();
+// AMQP amqp = new AMQP();
+// final BlockingConnection subscribeConnection = amqp.blockingConnection();
+// subscribeConnection.connect();
+// Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
+// Topic[] topics = {topic};
+// subscribeConnection.subscribe(topics);
+// final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+//
+// Thread thread = new Thread(new Runnable() {
+// public void run() {
+// for (int i = 0; i < numberOfMessages; i++){
+// try {
+// Message message = subscribeConnection.receive();
+// message.ack();
+// latch.countDown();
+// } catch (Exception e) {
+// e.printStackTrace();
+// break;
+// }
+//
+// }
+// }
+// });
+// thread.start();
+//
+// BlockingConnection publisherConnection = amqp.blockingConnection();
+// publisherConnection.connect();
+// for (int i = 0; i < numberOfMessages; i++){
+// String payload = "Message " + i;
+// publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
+// }
+//
+// latch.await(10, TimeUnit.SECONDS);
+// assertEquals(0, latch.getCount());
+// }
+//
+// @Test
+// public void testSendAndReceiveAtMostOnce() throws Exception {
+// addAMQPConnector(brokerService);
+// brokerService.start();
+// AMQP amqp = createAMQPConnection();
+// amqp.setKeepAlive(Short.MAX_VALUE);
+// BlockingConnection connection = amqp.blockingConnection();
+//
+// connection.connect();
+//
+//
+// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
+// connection.subscribe(topics);
+// for (int i = 0; i < numberOfMessages; i++) {
+// String payload = "Test Message: " + i;
+// connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
+// Message message = connection.receive();
+// assertEquals(payload, new String(message.getPayload()));
+// }
+// connection.disconnect();
+// }
+//
+// @Test
+// public void testSendAndReceiveAtLeastOnce() throws Exception {
+// addAMQPConnector(brokerService);
+// brokerService.start();
+// AMQP amqp = createAMQPConnection();
+// amqp.setKeepAlive(Short.MAX_VALUE);
+// BlockingConnection connection = amqp.blockingConnection();
+//
+// connection.connect();
+//
+// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
+// connection.subscribe(topics);
+// for (int i = 0; i < numberOfMessages; i++) {
+// String payload = "Test Message: " + i;
+// connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+// Message message = connection.receive();
+// message.ack();
+// assertEquals(payload, new String(message.getPayload()));
+// }
+// connection.disconnect();
+// }
+//
+// @Test
+// public void testSendAndReceiveExactlyOnce() throws Exception {
+// addAMQPConnector(brokerService);
+// brokerService.start();
+// AMQP publisher = createAMQPConnection();
+// BlockingConnection pubConnection = publisher.blockingConnection();
+//
+// pubConnection.connect();
+//
+// AMQP subscriber = createAMQPConnection();
+// BlockingConnection subConnection = subscriber.blockingConnection();
+//
+// subConnection.connect();
+//
+// Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
+// subConnection.subscribe(topics);
+// for (int i = 0; i < numberOfMessages; i++) {
+// String payload = "Test Message: " + i;
+// pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
+// Message message = subConnection.receive();
+// message.ack();
+// assertEquals(payload, new String(message.getPayload()));
+// }
+// subConnection.disconnect();
+// pubConnection.disconnect();
+// }
+//
+// @Test
+// public void testSendAndReceiveLargeMessages() throws Exception {
+// byte[] payload = new byte[1024 * 32];
+// for (int i = 0; i < payload.length; i++){
+// payload[i] = '2';
+// }
+// addAMQPConnector(brokerService);
+// brokerService.start();
+//
+// AMQP publisher = createAMQPConnection();
+// BlockingConnection pubConnection = publisher.blockingConnection();
+//
+// pubConnection.connect();
+//
+// AMQP subscriber = createAMQPConnection();
+// BlockingConnection subConnection = subscriber.blockingConnection();
+//
+// subConnection.connect();
+//
+// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
+// subConnection.subscribe(topics);
+// for (int i = 0; i < 10; i++) {
+// pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
+// Message message = subConnection.receive();
+// message.ack();
+// assertArrayEquals(payload, message.getPayload());
+// }
+// subConnection.disconnect();
+// pubConnection.disconnect();
+// }
+//
+//
+// @Test
+// public void testSendAMQPReceiveJMS() throws Exception {
+// addAMQPConnector(brokerService);
+// brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
+// brokerService.start();
+// AMQP amqp = createAMQPConnection();
+// BlockingConnection connection = amqp.blockingConnection();
+// final String DESTINATION_NAME = "foo.*";
+// connection.connect();
+//
+// ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
+// activeMQConnection.start();
+// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
+// MessageConsumer consumer = s.createConsumer(jmsTopic);
+//
+// for (int i = 0; i < numberOfMessages; i++) {
+// String payload = "Test Message: " + i;
+// connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+// ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
+// ByteSequence bs = message.getContent();
+// assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+// }
+//
+//
+// activeMQConnection.close();
+// connection.disconnect();
+// }
+//
+// @Test
+// public void testSendJMSReceiveAMQP() throws Exception {
+// addAMQPConnector(brokerService);
+// brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
+// brokerService.start();
+// AMQP amqp = createAMQPConnection();
+// amqp.setKeepAlive(Short.MAX_VALUE);
+// BlockingConnection connection = amqp.blockingConnection();
+// connection.connect();
+//
+// ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
+// activeMQConnection.start();
+// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// javax.jms.Topic jmsTopic = s.createTopic("foo.far");
+// MessageProducer producer = s.createProducer(jmsTopic);
+//
+// Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
+// connection.subscribe(topics);
+// for (int i = 0; i < numberOfMessages; i++) {
+// String payload = "This is Test Message: " + i;
+// TextMessage sendMessage = s.createTextMessage(payload);
+// producer.send(sendMessage);
+// Message message = connection.receive();
+// message.ack();
+// assertEquals(payload, new String(message.getPayload()));
+// }
+// connection.disconnect();
+// }
+//
+//
+
+
+}
\ No newline at end of file
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
new file mode 100644
index 0000000000..ce089d9294
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import com.swiftmq.amqp.AMQPContext;
+import com.swiftmq.amqp.v100.client.*;
+import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue;
+import com.swiftmq.amqp.v100.messaging.AMQPMessage;
+import com.swiftmq.amqp.v100.types.AMQPString;
+import com.swiftmq.amqp.v100.types.AMQPType;
+import org.junit.Test;
+
+/**
+ * @author Hiram Chirino
+ */
+public class SwiftMQClientTest extends AmqpTest {
+
+ @Test
+ public void testSendReceive() throws Exception {
+
+ String queue = "testqueue";
+ int nMsgs = 1;
+ int qos = QoS.AT_MOST_ONCE;
+ AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
+
+ try {
+
+ Connection connection = new Connection(ctx, "127.0.0.1", port, false);
+ connection.setContainerId("client");
+ connection.setIdleTimeout(-1);
+ connection.setMaxFrameSize(1024 * 4);
+ connection.setExceptionListener(new ExceptionListener() {
+ public void onException(Exception e) {
+ e.printStackTrace();
+ }
+ });
+ connection.connect();
+ {
+ String data = String.format("%010d", 0);
+
+ Session session = connection.createSession(10, 10);
+ Producer p = session.createProducer(queue, qos);
+ for (int i = 0; i < nMsgs; i++) {
+ AMQPMessage msg = new AMQPMessage();
+ String s = "Message #" + (i + 1);
+ System.out.println("Sending " + s);
+ msg.setAmqpValue(new AmqpValue(new AMQPString(s + ", data: " + data)));
+ p.send(msg);
+ }
+ p.close();
+ session.close();
+ }
+// {
+// Session session = connection.createSession(10, 10);
+// Consumer c = session.createConsumer(queue, 100, qos, true, null);
+//
+// // Receive messages non-transacted
+// for (int i = 0; i < nMsgs; i++) {
+// AMQPMessage msg = c.receive();
+// final AMQPType value = msg.getAmqpValue().getValue();
+// if (value instanceof AMQPString) {
+// AMQPString s = (AMQPString) value;
+// System.out.println("Received: " + s.getValue());
+// }
+// if (!msg.isSettled())
+// msg.accept();
+// }
+// c.close();
+// session.close();
+// }
+ connection.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+}
diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties
new file mode 100755
index 0000000000..fd5a31bd24
--- /dev/null
+++ b/activemq-amqp/src/test/resources/log4j.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.fusesource=INFO
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true