From cdd5150340e69a27402fdf73d1726299b0b30854 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Wed, 3 Oct 2012 14:15:01 +0000 Subject: [PATCH] Initial rough cut of AMQP protocol support using the QPID proton project. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1393500 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-amqp/pom.xml | 130 +++ .../amqp/AMQPNativeInboundTransformer.java | 24 + .../amqp/AMQPSslTransportFactory.java | 81 ++ .../transport/amqp/AmqpNioSslTransport.java | 54 ++ .../amqp/AmqpNioSslTransportFactory.java | 69 ++ .../transport/amqp/AmqpNioTransport.java | 128 +++ .../amqp/AmqpNioTransportFactory.java | 99 +++ .../transport/amqp/AmqpProtocolConverter.java | 750 ++++++++++++++++++ .../transport/amqp/AmqpProtocolException.java | 50 ++ .../transport/amqp/AmqpSubscription.java | 68 ++ .../activemq/transport/amqp/AmqpSupport.java | 43 + .../transport/amqp/AmqpTransport.java | 43 + .../transport/amqp/AmqpTransportFactory.java | 80 ++ .../transport/amqp/AmqpTransportFilter.java | 137 ++++ .../transport/amqp/AmqpWireFormat.java | 88 ++ .../transport/amqp/AmqpWireFormatFactory.java | 29 + .../transport/amqp/InboundTransformer.java | 29 + .../amqp/JMSMappingInboundTransformer.java | 24 + .../transport/amqp/ResponseHandler.java | 29 + .../activemq/transport/amqp/package.html | 25 + .../org/apache/activemq/transport/amqp | 17 + .../org/apache/activemq/transport/amqp+nio | 17 + .../apache/activemq/transport/amqp+nio+ssl | 17 + .../org/apache/activemq/transport/amqp+ssl | 17 + .../org/apache/activemq/wireformat/amqp | 17 + .../activemq/transport/amqp/AmqpNioTest.java | 26 + .../activemq/transport/amqp/AmqpSslTest.java | 68 ++ .../activemq/transport/amqp/AmqpTest.java | 286 +++++++ .../transport/amqp/SwiftMQClientTest.java | 92 +++ .../src/test/resources/log4j.properties | 36 + 30 files changed, 2573 insertions(+) create mode 100644 activemq-amqp/pom.xml create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html create mode 100644 activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp create mode 100644 activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio create mode 100644 activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl create mode 100644 activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl create mode 100644 activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java create mode 100755 activemq-amqp/src/test/resources/log4j.properties diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml new file mode 100644 index 0000000000..d7d7354e69 --- /dev/null +++ b/activemq-amqp/pom.xml @@ -0,0 +1,130 @@ + + + + + 4.0.0 + + + org.apache.activemq + activemq-parent + 5.8-SNAPSHOT + + + activemq-amqp + jar + + ActiveMQ :: AMQP + ActiveMQ implementaiton of AMQP messaging protocol + + + + + org.apache.activemq + activemq-core + provided + + + + org.apache.qpid + qpid-proton + 1.0-SNAPSHOT + + + + org.fusesource.hawtbuf + hawtbuf + ${hawtbuf-version} + + + + + org.apache.activemq + activemq-core + test-jar + test + + + org.apache.activemq + activemq-console + test + + + + junit + junit + test + + + org.slf4j + slf4j-log4j12 + test + + + + + + + + swiftmq-client + + swiftmq-client-home + + + + com.swiftmq + swiftmq + 9.2.0 + system + ${swiftmq-client-home}/jars/swiftmq.jar + + + com.swiftmq + swiftmq-amqp + 9.2.0 + system + ${swiftmq-client-home}/jars/amqp.jar + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + add-test-source + generate-sources + + add-test-source + + + + ${basedir}/src/test-swiftmq/java + + + + + + + + + + + diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java new file mode 100644 index 0000000000..994319b030 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +/** +* @author Hiram Chirino +*/ +public class AMQPNativeInboundTransformer extends InboundTransformer { + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java new file mode 100644 index 0000000000..91b5dc952b --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.SslTransportFactory; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.wireformat.WireFormat; + +import java.util.HashMap; +import java.util.Map; + +/** + * A AMQP over SSL transport factory + */ +public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware { + + private BrokerContext brokerContext = null; + + protected String getDefaultWireFormatType() { + return "amqp"; + } + + @SuppressWarnings("rawtypes") + + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + transport = new AmqpTransportFilter(transport, format, brokerContext); + IntrospectionSupport.setProperties(transport, options); + return super.compositeConfigure(transport, format, options); + } + + @SuppressWarnings("rawtypes") + @Override + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + transport = super.serverConfigure(transport, format, options); + + MutexTransport mutex = transport.narrow(MutexTransport.class); + if (mutex != null) { + mutex.setSyncOnCommand(true); + } + + return transport; + } + + public void setBrokerService(BrokerService brokerService) { + this.brokerContext = brokerService.getBrokerContext(); + } + +// protected Transport createInactivityMonitor(Transport transport, WireFormat format) { +// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format); +// +// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class); +// filter.setInactivityMonitor(monitor); +// +// return monitor; +// } + + + @Override + protected boolean isUseInactivityMonitor(Transport transport) { + return false; + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java new file mode 100644 index 0000000000..398d75a097 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.DataByteArrayInputStream; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; + +public class AmqpNioSslTransport extends NIOSSLTransport { + + public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { + super(wireFormat, socketFactory, remoteLocation, localLocation); + } + + public AmqpNioSslTransport(WireFormat wireFormat, Socket socket) throws IOException { + super(wireFormat, socket); + } + + @Override + protected void initializeStreams() throws IOException { + super.initializeStreams(); + if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) { + serviceRead(); + } + } + + @Override + protected void processCommand(ByteBuffer plain) throws Exception { + doConsume(AmqpSupport.toBuffer(plain)); + } + +} \ No newline at end of file diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java new file mode 100644 index 0000000000..5130cb8daf --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.SslContext; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.wireformat.WireFormat; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory { + + SSLContext context; + + @Override + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpTransportServer(this, location, serverSocketFactory) { + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + AmqpNioSslTransport transport = new AmqpNioSslTransport(format, socket); + if (context != null) { + transport.setSslContext(context); + } + return transport; + } + }; + } + + @Override + protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new AmqpNioSslTransport(wf, socketFactory, location, localLocation); + } + + @Override + public TransportServer doBind(URI location) throws IOException { + if (SslContext.getCurrentSslContext() != null) { + try { + context = SslContext.getCurrentSslContext().getSSLContext(); + } catch (Exception e) { + throw new IOException(e); + } + } + return super.doBind(location); + } + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java new file mode 100644 index 0000000000..6f4550ff5f --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.transport.nio.NIOOutputStream; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.transport.nio.SelectorSelection; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.DataByteArrayInputStream; + +import javax.net.SocketFactory; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +/** + * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO + */ +public class AmqpNioTransport extends TcpTransport { + + private SocketChannel channel; + private SelectorSelection selection; + + private ByteBuffer inputBuffer; + + public AmqpNioTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { + super(wireFormat, socketFactory, remoteLocation, localLocation); + } + + public AmqpNioTransport(WireFormat wireFormat, Socket socket) throws IOException { + super(wireFormat, socket); + } + + protected void initializeStreams() throws IOException { + channel = socket.getChannel(); + channel.configureBlocking(false); + // listen for events telling us when the socket is readable. + selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { + public void onSelect(SelectorSelection selection) { + if (!isStopped()) { + serviceRead(); + } + } + + public void onError(SelectorSelection selection, Throwable error) { + if (error instanceof IOException) { + onException((IOException) error); + } else { + onException(IOExceptionSupport.create(error)); + } + } + }); + + inputBuffer = ByteBuffer.allocate(8 * 1024); + NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024); + this.dataOut = new DataOutputStream(outPutStream); + this.buffOut = outPutStream; + } + + private void serviceRead() { + try { + + while (isStarted()) { + // read channel + int readSize = channel.read(inputBuffer); + // channel is closed, cleanup + if (readSize == -1) { + onException(new EOFException()); + selection.close(); + break; + } + // nothing more to read, break + if (readSize == 0) { + break; + } + + inputBuffer.flip(); + doConsume(AmqpSupport.toBuffer(inputBuffer)); + // clear the buffer + inputBuffer.clear(); + + } + } catch (IOException e) { + onException(e); + } catch (Throwable e) { + onException(IOExceptionSupport.create(e)); + } + } + + protected void doStart() throws Exception { + connect(); + selection.setInterestOps(SelectionKey.OP_READ); + selection.enable(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + try { + if (selection != null) { + selection.close(); + } + } finally { + super.doStop(stopper); + } + } +} \ No newline at end of file diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java new file mode 100644 index 0000000000..e289dc1396 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.nio.NIOTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.wireformat.WireFormat; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +/** + * A AMQP over NIO transport factory + */ +public class AmqpNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware { + + private BrokerContext brokerContext = null; + + protected String getDefaultWireFormatType() { + return "amqp"; + } + + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpTransportServer(this, location, serverSocketFactory) { + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + return new AmqpNioTransport(format, socket); + } + }; + } + + protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new AmqpNioTransport(wf, socketFactory, location, localLocation); + } + + @SuppressWarnings("rawtypes") + @Override + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + transport = super.serverConfigure(transport, format, options); + + MutexTransport mutex = transport.narrow(MutexTransport.class); + if (mutex != null) { + mutex.setSyncOnCommand(true); + } + + return transport; + } + + @SuppressWarnings("rawtypes") + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + transport = new AmqpTransportFilter(transport, format, brokerContext); + IntrospectionSupport.setProperties(transport, options); + return super.compositeConfigure(transport, format, options); + } + + public void setBrokerService(BrokerService brokerService) { + this.brokerContext = brokerService.getBrokerContext(); + } + +// protected Transport createInactivityMonitor(Transport transport, WireFormat format) { +// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format); +// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class); +// filter.setInactivityMonitor(monitor); +// return monitor; +// } + + @Override + protected boolean isUseInactivityMonitor(Transport transport) { + return false; + } +} + diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java new file mode 100644 index 0000000000..82a3b96d13 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -0,0 +1,750 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.command.*; +import org.apache.activemq.command.Message; +import org.apache.activemq.util.*; +import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.engine.impl.ConnectionImpl; +import org.apache.qpid.proton.engine.impl.DeliveryImpl; +import org.apache.qpid.proton.engine.impl.TransportImpl; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.Inflater; + +class AmqpProtocolConverter { + + public static final EnumSet UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED); + public static final EnumSet INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET); + public static final EnumSet ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE); + public static final EnumSet CLOSED_STATE = EnumSet.of(EndpointState.CLOSED); + private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class); + + private final AmqpTransport amqpTransport; + + public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) { + this.amqpTransport = amqpTransport; + } + +// +// private static final Buffer PING_RESP_FRAME = new PINGRESP().encode(); +// +// +// private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); +// private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); +// +// private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); +// private final ConcurrentHashMap amqpSubscriptionByTopic = new ConcurrentHashMap(); +// private final Map activeMQTopicMap = new LRUCache(); +// private final Map amqpTopicMap = new LRUCache(); +// private final Map consumerAcks = new LRUCache(); +// private final Map publisherRecs = new LRUCache(); +// +// private final AtomicBoolean connected = new AtomicBoolean(false); +// private CONNECT connect; +// private String clientId; +// private final String QOS_PROPERTY_NAME = "QoSPropertyName"; + + + TransportImpl protonTransport = new TransportImpl(); + ConnectionImpl protonConnection = new ConnectionImpl(); + + { + this.protonTransport.bind(this.protonConnection); + } + + void pumpOut() { + try { + int size = 1024 * 64; + byte data[] = new byte[size]; + boolean done = false; + while (!done) { + int count = protonTransport.output(data, 0, size); + if (count > 0) { + final Buffer buffer = new Buffer(data, 0, count); + System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 ")); + amqpTransport.sendToAmqp(buffer); + } else { + done = true; + } + } +// System.out.println("write done"); + } catch (IOException e) { + amqpTransport.onException(e); + } + } + + static class AmqpSessionContext { + private final SessionId sessionId; + long nextProducerId = 0; + long nextConsumerId = 0; + + public AmqpSessionContext(ConnectionId connectionId, long id) { + sessionId = new SessionId(connectionId, -1); + + } + } + + /** + * Convert a AMQP command + */ + public void onAMQPData(Buffer frame) throws IOException, JMSException { + + + try { + System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 ")); + protonTransport.input(frame.data, frame.offset, frame.length); + } catch (Throwable e) { + handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e)); + } + + try { + + // Handle the amqp open.. + if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) { + onConnectionOpen(); + } + + // Lets map amqp sessions to openwire sessions.. + Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET); + while (session != null) { + + onSessionOpen(session); + session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET); + } + + + Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET); + while (link != null) { + onLinkOpen(link); + link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET); + } + + Delivery delivery = protonConnection.getWorkHead(); + while (delivery != null) { + AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext(); + if (listener != null) { + listener.onDelivery(delivery); + } + delivery = delivery.getWorkNext(); + } + + link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE); + while (link != null) { + if (link instanceof Receiver) { +// listener.onReceiverClose((Receiver) link); + } else { +// listener.onSenderClose((Sender) link); + } + link.close(); + link = link.next(ACTIVE_STATE, CLOSED_STATE); + } + + session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE); + while (session != null) { + //TODO - close links? +// listener.onSessionClose(session); + session.close(); + session = session.next(ACTIVE_STATE, CLOSED_STATE); + } + if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) { +// listener.onConnectionClose(protonConnection); + protonConnection.close(); + } + + } catch (Throwable e) { + handleException(new AmqpProtocolException("Could not process AMQP commands", true, e)); + } + + pumpOut(); + } + + public void onActiveMQCommand(Command command) throws Exception { + if (command.isResponse()) { + Response response = (Response) command; + ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); + if (rh != null) { + rh.onResponse(this, response); + } else { + // Pass down any unexpected errors. Should this close the connection? + if (response.isException()) { + Throwable exception = ((ExceptionResponse) response).getException(); + handleException(exception); + } + } + } else if (command.isMessageDispatch()) { + MessageDispatch md = (MessageDispatch) command; + ConsumerContext consumerContext = subscriptionsByConsumerId.get(md.getConsumerId()); + if (consumerContext != null) { + consumerContext.onMessageDispatch(md); + } + } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { + // Pass down any unexpected async errors. Should this close the connection? + Throwable exception = ((ConnectionError) command).getException(); + handleException(exception); + } else if (command.isBrokerInfo()) { + //ignore + } else { + LOG.debug("Do not know how to process ActiveMQ Command " + command); + } + } + + private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); + private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); + private ConnectionInfo connectionInfo = new ConnectionInfo(); + private long nextSessionId = 0; + + static abstract class AmqpDeliveryListener { + abstract public void onDelivery(Delivery delivery) throws Exception; + } + + private void onConnectionOpen() throws AmqpProtocolException { + connectionInfo.setResponseRequired(true); + connectionInfo.setConnectionId(connectionId); +// configureInactivityMonitor(connect.keepAlive()); + + String clientId = protonConnection.getRemoteContainer(); + if (clientId != null && !clientId.isEmpty()) { + connectionInfo.setClientId(clientId); + } else { + connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); + } + + +// String userName = ""; +// if (connect.userName() != null) { +// userName = connect.userName().toString(); +// } +// String passswd = ""; +// if (connect.password() != null) { +// passswd = connect.password().toString(); +// } +// connectionInfo.setUserName(userName); +// connectionInfo.setPassword(passswd); + + connectionInfo.setTransportContext(amqpTransport.getPeerCertificates()); + + sendToActiveMQ(connectionInfo, new ResponseHandler() { + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + + protonConnection.open(); + pumpOut(); + + if (response.isException()) { + Throwable exception = ((ExceptionResponse) response).getException(); +// TODO: figure out how to close /w an error. +// protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage())); + protonConnection.close(); + pumpOut(); + amqpTransport.onException(IOExceptionSupport.create(exception)); + return; + } + + } + }); + } + + private void onSessionOpen(Session session) { + AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++); + session.setContext(sessionContext); + sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null); + session.open(); + } + + private void onLinkOpen(Link link) { + link.setLocalSourceAddress(link.getRemoteSourceAddress()); + link.setLocalTargetAddress(link.getRemoteTargetAddress()); + + AmqpSessionContext sessionContext = (AmqpSessionContext) link.getSession().getContext(); + if (link instanceof Receiver) { + onReceiverOpen((Receiver) link, sessionContext); + } else { + onSenderOpen((Sender) link, sessionContext); + } + } + + class ProducerContext extends AmqpDeliveryListener { + private final ProducerId producerId; + private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); + private final ActiveMQDestination destination; + + public ProducerContext(ProducerId producerId, ActiveMQDestination destination) { + this.producerId = producerId; + this.destination = destination; + } + + @Override + public void onDelivery(Delivery delivery) throws JMSException { +// delivery. + ActiveMQMessage message = convertMessage((DeliveryImpl) delivery); + message.setProducerId(producerId); + message.onSend(); +// sendToActiveMQ(message, createResponseHandler(command)); + sendToActiveMQ(message, null); + } + + ActiveMQMessage convertMessage(DeliveryImpl delivery) throws JMSException { + ActiveMQBytesMessage msg = nextMessage(delivery); + final Receiver receiver = (Receiver) delivery.getLink(); + byte buff[] = new byte[1024 * 4]; + int count = 0; + while ((count = receiver.recv(buff, 0, buff.length)) >= 0) { + msg.writeBytes(buff, 0, count); + } + return msg; + } + + ActiveMQBytesMessage current; + + private ActiveMQBytesMessage nextMessage(DeliveryImpl delivery) throws JMSException { + if (current == null) { + current = new ActiveMQBytesMessage(); + current.setJMSDestination(destination); + current.setProducerId(producerId); + current.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId())); + current.setTimestamp(System.currentTimeMillis()); + current.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY); +// msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE); +// msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); + System.out.println(delivery.getLocalState() + "/" + delivery.getRemoteState()); + } + return current; + } + + } + + + void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) { + // Client is producing to this receiver object + + ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++); + ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE); + ProducerContext producerContext = new ProducerContext(producerId, destination); + + receiver.setContext(producerContext); + receiver.flow(1024 * 64); + ProducerInfo producerInfo = new ProducerInfo(producerId); + producerInfo.setDestination(destination); + sendToActiveMQ(producerInfo, new ResponseHandler() { + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + receiver.open(); + if (response.isException()) { + // If the connection attempt fails we close the socket. + Throwable exception = ((ExceptionResponse) response).getException(); + receiver.close(); + } + pumpOut(); + } + }); + + } + + + class ConsumerContext extends AmqpDeliveryListener { + private final ConsumerId consumerId; + private final Sender sender; + + long nextTagId = 0; + HashSet tagCache = new HashSet(); + + byte[] nextTag() { + byte[] rc; + if (tagCache != null && !tagCache.isEmpty()) { + final Iterator iterator = tagCache.iterator(); + rc = iterator.next(); + iterator.remove(); + } else { + try { + rc = Long.toHexString(nextTagId++).getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + return rc; + } + + public ConsumerContext(ConsumerId consumerId, Sender sender) { + this.consumerId = consumerId; + this.sender = sender; + } + + // called when the connection receives a JMS message from ActiveMQ + public void onMessageDispatch(MessageDispatch md) throws Exception { + final byte[] tag = nextTag(); + final Delivery delivery = sender.delivery(tag, 0, tag.length); + delivery.setContext(md); + + // Covert to an AMQP messages. + org.apache.qpid.proton.message.Message msg = convertMessage(md.getMessage()); + byte buffer[] = new byte[1024*4]; + int c=0; + + // And send the AMQP message over the link. + while( (c=msg.encode(buffer, 0 , 0)) >= 0 ) { + sender.send(buffer, 0, c); + } + sender.advance(); + + } + + public org.apache.qpid.proton.message.Message convertMessage(Message message) throws Exception { +// result.setContentEncoding(); +// QoS qoS; +// if (message.propertyExists(QOS_PROPERTY_NAME)) { +// int ordinal = message.getIntProperty(QOS_PROPERTY_NAME); +// qoS = QoS.values()[ordinal]; +// +// } else { +// qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE; +// } +// result.qos(qoS); + + Buffer content = null; + if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { + ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); + msg.setReadOnlyBody(true); + String messageText = msg.getText(); + content = new Buffer(messageText.getBytes("UTF-8")); + } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { + ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); + msg.setReadOnlyBody(true); + byte[] data = new byte[(int) msg.getBodyLength()]; + msg.readBytes(data); + content = new Buffer(data); + } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { + ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); + msg.setReadOnlyBody(true); + Map map = msg.getContentMap(); + content = new Buffer(map.toString().getBytes("UTF-8")); + } else { + ByteSequence byteSequence = message.getContent(); + if (byteSequence != null && byteSequence.getLength() > 0) { + if (message.isCompressed()) { + Inflater inflater = new Inflater(); + inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); + byte[] data = new byte[4096]; + int read; + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + while ((read = inflater.inflate(data)) != 0) { + bytesOut.write(data, 0, read); + } + byteSequence = bytesOut.toByteSequence(); + } + content = new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length); + } else { + content = new Buffer(0); + } + } + + org.apache.qpid.proton.message.Message result = new org.apache.qpid.proton.message.Message(); + return result; + } + + + @Override + public void onDelivery(Delivery delivery) throws JMSException { + if( delivery.remotelySettled() ) { + MessageDispatch md = (MessageDispatch) delivery.getContext(); + } + } + + } + + private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); + + void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) { + + ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++); + ConsumerContext consumerContext = new ConsumerContext(id, sender); + + subscriptionsByConsumerId.put(id, consumerContext); + + ActiveMQDestination destination = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE); + + sender.setContext(consumerContext); + ConsumerInfo consumerInfo = new ConsumerInfo(id); + consumerInfo.setDestination(destination); + consumerInfo.setPrefetchSize(100); + consumerInfo.setDispatchAsync(true); + + sendToActiveMQ(consumerInfo, new ResponseHandler() { + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + sender.open(); + if (response.isException()) { + Throwable exception = ((ExceptionResponse) response).getException(); + sender.close(); + } + pumpOut(); + } + }); + + } + +// +// QoS onSubscribe(SUBSCRIBE command, Topic topic) throws AmqpProtocolException { +// ActiveMQDestination destination = new ActiveMQTopic(convertAMQPToActiveMQ(topic.name().toString())); +// if (destination == null) { +// throw new AmqpProtocolException("Invalid Destination."); +// } +// +// ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); +// ConsumerInfo consumerInfo = new ConsumerInfo(id); +// consumerInfo.setDestination(destination); +// consumerInfo.setPrefetchSize(1000); +// consumerInfo.setDispatchAsync(true); +// if (!connect.cleanSession() && (connect.clientId() != null)) { +// //by default subscribers are persistent +// consumerInfo.setSubscriptionName(connect.clientId().toString()); +// } +// +// AmqpSubscription amqpSubscription = new AmqpSubscription(this, topic.qos(), consumerInfo); +// +// +// amqpSubscriptionByTopic.put(topic.name(), amqpSubscription); +// +// sendToActiveMQ(consumerInfo, null); +// return topic.qos(); +// } +// +// void onUnSubscribe(UNSUBSCRIBE command) { +// UTF8Buffer[] topics = command.topics(); +// if (topics != null) { +// for (int i = 0; i < topics.length; i++) { +// onUnSubscribe(topics[i]); +// } +// } +// UNSUBACK ack = new UNSUBACK(); +// ack.messageId(command.messageId()); +// pumpOut(ack.encode()); +// +// } +// +// void onUnSubscribe(UTF8Buffer topicName) { +// AmqpSubscription subs = amqpSubscriptionByTopic.remove(topicName); +// if (subs != null) { +// ConsumerInfo info = subs.getConsumerInfo(); +// if (info != null) { +// subscriptionsByConsumerId.remove(info.getConsumerId()); +// } +// RemoveInfo removeInfo = info.createRemoveCommand(); +// sendToActiveMQ(removeInfo, null); +// } +// } +// +// +// /** +// * Dispatch a ActiveMQ command +// */ +// +// +// +// void onAMQPPublish(PUBLISH command) throws IOException, JMSException { +// checkConnected(); +// } +// +// void onAMQPPubAck(PUBACK command) { +// short messageId = command.messageId(); +// MessageAck ack; +// synchronized (consumerAcks) { +// ack = consumerAcks.remove(messageId); +// } +// if (ack != null) { +// amqpTransport.sendToActiveMQ(ack); +// } +// } +// +// void onAMQPPubRec(PUBREC commnand) { +// //from a subscriber - send a PUBREL in response +// PUBREL pubrel = new PUBREL(); +// pubrel.messageId(commnand.messageId()); +// pumpOut(pubrel.encode()); +// } +// +// void onAMQPPubRel(PUBREL command) { +// PUBREC ack; +// synchronized (publisherRecs) { +// ack = publisherRecs.remove(command.messageId()); +// } +// if (ack == null) { +// LOG.warn("Unknown PUBREL: " + command.messageId() + " received"); +// } +// PUBCOMP pubcomp = new PUBCOMP(); +// pubcomp.messageId(command.messageId()); +// pumpOut(pubcomp.encode()); +// } +// +// void onAMQPPubComp(PUBCOMP command) { +// short messageId = command.messageId(); +// MessageAck ack; +// synchronized (consumerAcks) { +// ack = consumerAcks.remove(messageId); +// } +// if (ack != null) { +// amqpTransport.sendToActiveMQ(ack); +// } +// } +// +// +// +// +// public AmqpTransport amqpTransport { +// return amqpTransport; +// } +// +// +// +// void configureInactivityMonitor(short heartBeat) { +// try { +// +// int heartBeatMS = heartBeat * 1000; +// AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); +// monitor.setProtocolConverter(this); +// monitor.setReadCheckTime(heartBeatMS); +// monitor.setInitialDelayTime(heartBeatMS); +// monitor.startMonitorThread(); +// +// } catch (Exception ex) { +// LOG.warn("Failed to start AMQP InactivityMonitor ", ex); +// } +// +// LOG.debug(getClientId() + " AMQP Connection using heart beat of " + heartBeat + " secs"); +// } +// +// +// +// void checkConnected() throws AmqpProtocolException { +// if (!connected.get()) { +// throw new AmqpProtocolException("Not connected."); +// } +// } +// +// private String getClientId() { +// if (clientId == null) { +// if (connect != null && connect.clientId() != null) { +// clientId = connect.clientId().toString(); +// } +// } else { +// clientId = ""; +// } +// return clientId; +// } +// +// private void stopTransport() { +// try { +// amqpTransport.stop(); +// } catch (Throwable e) { +// LOG.debug("Failed to stop AMQP transport ", e); +// } +// } +// +// ResponseHandler createResponseHandler(final PUBLISH command) { +// +// if (command != null) { +// switch (command.qos()) { +// case AT_LEAST_ONCE: +// return new ResponseHandler() { +// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { +// if (response.isException()) { +// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException()); +// } else { +// PUBACK ack = new PUBACK(); +// ack.messageId(command.messageId()); +// converter.amqpTransport.sendToAmqp(ack.encode()); +// } +// } +// }; +// case EXACTLY_ONCE: +// return new ResponseHandler() { +// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { +// if (response.isException()) { +// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException()); +// } else { +// PUBREC ack = new PUBREC(); +// ack.messageId(command.messageId()); +// synchronized (publisherRecs) { +// publisherRecs.put(command.messageId(), ack); +// } +// converter.amqpTransport.sendToAmqp(ack.encode()); +// } +// } +// }; +// case AT_MOST_ONCE: +// break; +// } +// } +// return null; +// } +// +// private String convertAMQPToActiveMQ(String name) { +// String result = name.replace('#', '>'); +// result = result.replace('+', '*'); +// result = result.replace('/', '.'); +// return result; +// } + + //////////////////////////////////////////////////////////////////////////// + // + // Implementation methods + // + //////////////////////////////////////////////////////////////////////////// + + private final Object commnadIdMutex = new Object(); + private int lastCommandId; + + int generateCommandId() { + synchronized (commnadIdMutex) { + return lastCommandId++; + } + } + + private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); + + void sendToActiveMQ(Command command, ResponseHandler handler) { + command.setCommandId(generateCommandId()); + if (handler != null) { + command.setResponseRequired(true); + resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); + } + amqpTransport.sendToActiveMQ(command); + } + + void handleException(Throwable exception) { + exception.printStackTrace(); + if (LOG.isDebugEnabled()) { + LOG.debug("Exception detail", exception); + } + try { + amqpTransport.stop(); + } catch (Throwable e) { + LOG.error("Failed to stop AMQP Transport ", e); + } + } + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java new file mode 100644 index 0000000000..852d16b65d --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import java.io.IOException; + + +public class AmqpProtocolException extends IOException { + + private static final long serialVersionUID = -2869735532997332242L; + + private final boolean fatal; + + public AmqpProtocolException() { + this(null); + } + + public AmqpProtocolException(String s) { + this(s, false); + } + + public AmqpProtocolException(String s, boolean fatal) { + this(s, fatal, null); + } + + public AmqpProtocolException(String s, boolean fatal, Throwable cause) { + super(s); + this.fatal = fatal; + initCause(cause); + } + + public boolean isFatal() { + return fatal; + } + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java new file mode 100644 index 0000000000..a68ee00e70 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.command.*; + +import javax.jms.JMSException; +import java.io.IOException; +import java.util.zip.DataFormatException; + +/** + * Keeps track of the AMQP client subscription so that acking is correctly done. + */ +class AmqpSubscription { +// private final AmqpProtocolConverter protocolConverter; +// +// private final ConsumerInfo consumerInfo; +// private ActiveMQDestination destination; +// private final QoS qos; +// +// public AmqpSubscription(AmqpProtocolConverter protocolConverter, QoS qos, ConsumerInfo consumerInfo) { +// this.protocolConverter = protocolConverter; +// this.consumerInfo = consumerInfo; +// this.qos = qos; +// } +// +// MessageAck createMessageAck(MessageDispatch md) { +// return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); +// } +// +// PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException { +// PUBLISH publish = protocolConverter.convertMessage(message); +// if (publish.qos().ordinal() > this.qos.ordinal()) { +// publish.qos(this.qos); +// } +// return publish; +// } +// +// public boolean expectAck() { +// return qos != QoS.AT_MOST_ONCE; +// } +// +// public void setDestination(ActiveMQDestination destination) { +// this.destination = destination; +// } +// +// public ActiveMQDestination getDestination() { +// return destination; +// } +// +// public ConsumerInfo getConsumerInfo() { +// return consumerInfo; +// } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java new file mode 100644 index 0000000000..4e992b751c --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.fusesource.hawtbuf.Buffer; + +import java.nio.ByteBuffer; + +/** + * @author Hiram Chirino + */ +public class AmqpSupport { + + static public Buffer toBuffer(ByteBuffer data) { + if( data == null ) { + return null; + } + Buffer rc; + if( data.isDirect() ) { + rc = new Buffer(data.remaining()); + data.get(rc.data); + } else { + rc = new Buffer(data); + data.position(data.position()+data.remaining()); + } + return rc; + } + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java new file mode 100644 index 0000000000..a8ceaaa3a2 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.command.Command; +import org.fusesource.hawtbuf.Buffer; + +import java.io.IOException; +import java.security.cert.X509Certificate; + +/** + * Basic interface that mediates between protocol converter and transport + */ +public interface AmqpTransport { + + public void sendToActiveMQ(Command command); + + public void sendToAmqp(Buffer command) throws IOException; + + public X509Certificate[] getPeerCertificates(); + + public void onException(IOException error); + +// public AmqpInactivityMonitor getInactivityMonitor(); + + public AmqpWireFormat getWireFormat(); + + public void stop() throws Exception; +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java new file mode 100644 index 0000000000..7fe4ae590b --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.wireformat.WireFormat; + +import java.util.HashMap; +import java.util.Map; + +/** + * A AMQP transport factory + */ +public class AmqpTransportFactory extends TcpTransportFactory implements BrokerServiceAware { + + private BrokerContext brokerContext = null; + + protected String getDefaultWireFormatType() { + return "amqp"; + } + + @SuppressWarnings("rawtypes") + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + transport = new AmqpTransportFilter(transport, format, brokerContext); + IntrospectionSupport.setProperties(transport, options); + return super.compositeConfigure(transport, format, options); + } + + public void setBrokerService(BrokerService brokerService) { + this.brokerContext = brokerService.getBrokerContext(); + } + + @SuppressWarnings("rawtypes") + @Override + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + transport = super.serverConfigure(transport, format, options); + + MutexTransport mutex = transport.narrow(MutexTransport.class); + if (mutex != null) { + mutex.setSyncOnCommand(true); + } + + return transport; + } + +// @Override +// protected Transport createInactivityMonitor(Transport transport, WireFormat format) { +// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format); +// +// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class); +// filter.setInactivityMonitor(monitor); +// +// return monitor; +// } + + @Override + protected boolean isUseInactivityMonitor(Transport transport) { + return false; + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java new file mode 100644 index 0000000000..1230bd3e9e --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.tcp.SslTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import java.io.IOException; +import java.security.cert.X509Certificate; + +/** + * The AMQPTransportFilter normally sits on top of a TcpTransport that has been + * configured with the StompWireFormat and is used to convert AMQP commands to + * ActiveMQ commands. All of the conversion work is done by delegating to the + * AMQPProtocolConverter + */ +public class AmqpTransportFilter extends TransportFilter implements AmqpTransport { + private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class); + private static final Logger TRACE = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".AMQPIO"); + private final AmqpProtocolConverter protocolConverter; +// private AmqpInactivityMonitor monitor; + private AmqpWireFormat wireFormat; + + private boolean trace; + + public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) { + super(next); + this.protocolConverter = new AmqpProtocolConverter(this, brokerContext); + + if (wireFormat instanceof AmqpWireFormat) { + this.wireFormat = (AmqpWireFormat) wireFormat; + } + } + + public void oneway(Object o) throws IOException { + try { + final Command command = (Command) o; + protocolConverter.onActiveMQCommand(command); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } + } + + public void onCommand(Object command) { + try { + if (trace) { + TRACE.trace("Received: \n" + command); + } + + protocolConverter.onAMQPData((Buffer) command); + } catch (IOException e) { + handleException(e); + } catch (JMSException e) { + onException(IOExceptionSupport.create(e)); + } + } + + public void sendToActiveMQ(Command command) { + TransportListener l = transportListener; + if (l != null) { + l.onCommand(command); + } + } + + public void sendToAmqp(Buffer command) throws IOException { + if (trace) { + TRACE.trace("Sending: \n" + command); + } + Transport n = next; + if (n != null) { + n.oneway(command); + } + } + + public X509Certificate[] getPeerCertificates() { + if (next instanceof SslTransport) { + X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates(); + if (trace && peerCerts != null) { + LOG.debug("Peer Identity has been verified\n"); + } + return peerCerts; + } + return null; + } + + public boolean isTrace() { + return trace; + } + + public void setTrace(boolean trace) { + this.trace = trace; + } + +// @Override +// public AmqpInactivityMonitor getInactivityMonitor() { +// return monitor; +// } +// +// public void setInactivityMonitor(AmqpInactivityMonitor monitor) { +// this.monitor = monitor; +// } + + @Override + public AmqpWireFormat getWireFormat() { + return this.wireFormat; + } + + public void handleException(IOException e) { + super.onException(e); + } + + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java new file mode 100644 index 0000000000..c1456c1996 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.util.ByteArrayInputStream; +import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.Buffer; + +import java.io.*; + +/** + */ +public class AmqpWireFormat implements WireFormat { + + + private int version = 1; + private long maxFrameLength = 1024*1024*100; + + public ByteSequence marshal(Object command) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + marshal(command, dos); + dos.close(); + return baos.toByteSequence(); + } + + public Object unmarshal(ByteSequence packet) throws IOException { + ByteArrayInputStream stream = new ByteArrayInputStream(packet); + DataInputStream dis = new DataInputStream(stream); + return unmarshal(dis); + } + + public void marshal(Object command, DataOutput dataOut) throws IOException { + Buffer frame = (Buffer) command; + frame.writeTo(dataOut); + } + + boolean magicRead = false; + public Object unmarshal(DataInput dataIn) throws IOException { + if( !magicRead ) { + Buffer magic = new Buffer(8); + magic.readFrom(dataIn); + magicRead = true; + return magic; + } else { + int size = dataIn.readInt(); + if( size > maxFrameLength ) { + throw new AmqpProtocolException("Frame size exceeded max frame length."); + } + Buffer frame = new Buffer(size); + frame.bigEndianEditor().writeInt(size); + frame.readFrom(dataIn); + frame.clear(); + return frame; + } + } + + /** + */ + public void setVersion(int version) { + this.version = version; + } + + /** + * @return the version of the wire format + */ + public int getVersion() { + return this.version; + } + + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java new file mode 100644 index 0000000000..11d40fcb9d --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.wireformat.WireFormatFactory; + +/** + * Creates WireFormat objects that marshalls the Stomp protocol. + */ +public class AmqpWireFormatFactory implements WireFormatFactory { + public WireFormat createWireFormat() { + return new AmqpWireFormat(); + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java new file mode 100644 index 0000000000..10344894b8 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +/** +* @author Hiram Chirino +*/ +public class InboundTransformer { + + String prefixVendor = "JMS_AMQP_"; + int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; + int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; + long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java new file mode 100644 index 0000000000..8626c17ae6 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +/** +* @author Hiram Chirino +*/ +public class JMSMappingInboundTransformer extends InboundTransformer { + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java new file mode 100644 index 0000000000..bf1bdae9bd --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.command.Response; + +import java.io.IOException; + + +/** + * Interface used by the AMQPProtocolConverter for callbacks. + */ +interface ResponseHandler { + void onResponse(AmqpProtocolConverter converter, Response response) throws IOException; +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html new file mode 100644 index 0000000000..42f206ae6c --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html @@ -0,0 +1,25 @@ + + + + + + +A Broker side implementation of the AMQP 3.1 protocol - see http://amqp.org/ + + + diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp new file mode 100644 index 0000000000..9e09d18969 --- /dev/null +++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpTransportFactory diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio new file mode 100644 index 0000000000..40bf30f95b --- /dev/null +++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpNioTransportFactory diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl new file mode 100644 index 0000000000..25af0bd66a --- /dev/null +++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpNioSslTransportFactory \ No newline at end of file diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl new file mode 100644 index 0000000000..fbcc95d3d3 --- /dev/null +++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp new file mode 100644 index 0000000000..b2990235cf --- /dev/null +++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpWireFormatFactory \ No newline at end of file diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java new file mode 100644 index 0000000000..859062fdc6 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerService; + +public class AmqpNioTest extends AmqpTest { + protected void addAMQPConnector(BrokerService brokerService) throws Exception { + brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1"); + } + +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java new file mode 100644 index 0000000000..443479e33a --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerService; +import org.junit.Ignore; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +@Ignore("hangs atm, needs investigation") +public class AmqpSslTest extends AmqpTest { + public void startBroker() throws Exception { + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + super.startBroker(); + } + + protected void addAMQPConnector(BrokerService brokerService) throws Exception { + brokerService.addConnector("amqp+ssl://localhost:8883"); + } + +// protected AMQP createAMQPConnection() throws Exception { +// AMQP amqp = new AMQP(); +// amqp.setHost("ssl://localhost:8883"); +// SSLContext ctx = SSLContext.getInstance("TLS"); +// ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); +// amqp.setSslContext(ctx); +// return amqp; +// } + + static class DefaultTrustManager implements X509TrustManager { + + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } + +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java new file mode 100644 index 0000000000..17afc37e88 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.AutoFailTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.util.ByteSequence; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.fusesource.hawtbuf.UTF8Buffer.utf8; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + + +public class AmqpTest { + protected static final Logger LOG = LoggerFactory.getLogger(AmqpTest.class); + protected BrokerService brokerService; + protected Vector exceptions = new Vector(); + protected int numberOfMessages; + AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {}; + protected int port; + + @Before + public void startBroker() throws Exception { + autoFailTestSupport.startAutoFailThread(); + exceptions.clear(); + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + addAMQPConnector(); + brokerService.start(); + this.numberOfMessages = 2000; + } + + protected void addAMQPConnector() throws Exception { + final TransportConnector connector = brokerService.addConnector("amqp://localhost:0"); + port = connector.getConnectUri().getPort(); + } + + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService = null; + } + autoFailTestSupport.stopAutoFailThread(); + } + + + +// @Test +// public void testSendAndReceiveAMQP() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.start(); +// AMQP amqp = new AMQP(); +// final BlockingConnection subscribeConnection = amqp.blockingConnection(); +// subscribeConnection.connect(); +// Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE); +// Topic[] topics = {topic}; +// subscribeConnection.subscribe(topics); +// final CountDownLatch latch = new CountDownLatch(numberOfMessages); +// +// Thread thread = new Thread(new Runnable() { +// public void run() { +// for (int i = 0; i < numberOfMessages; i++){ +// try { +// Message message = subscribeConnection.receive(); +// message.ack(); +// latch.countDown(); +// } catch (Exception e) { +// e.printStackTrace(); +// break; +// } +// +// } +// } +// }); +// thread.start(); +// +// BlockingConnection publisherConnection = amqp.blockingConnection(); +// publisherConnection.connect(); +// for (int i = 0; i < numberOfMessages; i++){ +// String payload = "Message " + i; +// publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false); +// } +// +// latch.await(10, TimeUnit.SECONDS); +// assertEquals(0, latch.getCount()); +// } +// +// @Test +// public void testSendAndReceiveAtMostOnce() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.start(); +// AMQP amqp = createAMQPConnection(); +// amqp.setKeepAlive(Short.MAX_VALUE); +// BlockingConnection connection = amqp.blockingConnection(); +// +// connection.connect(); +// +// +// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)}; +// connection.subscribe(topics); +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "Test Message: " + i; +// connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false); +// Message message = connection.receive(); +// assertEquals(payload, new String(message.getPayload())); +// } +// connection.disconnect(); +// } +// +// @Test +// public void testSendAndReceiveAtLeastOnce() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.start(); +// AMQP amqp = createAMQPConnection(); +// amqp.setKeepAlive(Short.MAX_VALUE); +// BlockingConnection connection = amqp.blockingConnection(); +// +// connection.connect(); +// +// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)}; +// connection.subscribe(topics); +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "Test Message: " + i; +// connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +// Message message = connection.receive(); +// message.ack(); +// assertEquals(payload, new String(message.getPayload())); +// } +// connection.disconnect(); +// } +// +// @Test +// public void testSendAndReceiveExactlyOnce() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.start(); +// AMQP publisher = createAMQPConnection(); +// BlockingConnection pubConnection = publisher.blockingConnection(); +// +// pubConnection.connect(); +// +// AMQP subscriber = createAMQPConnection(); +// BlockingConnection subConnection = subscriber.blockingConnection(); +// +// subConnection.connect(); +// +// Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)}; +// subConnection.subscribe(topics); +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "Test Message: " + i; +// pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false); +// Message message = subConnection.receive(); +// message.ack(); +// assertEquals(payload, new String(message.getPayload())); +// } +// subConnection.disconnect(); +// pubConnection.disconnect(); +// } +// +// @Test +// public void testSendAndReceiveLargeMessages() throws Exception { +// byte[] payload = new byte[1024 * 32]; +// for (int i = 0; i < payload.length; i++){ +// payload[i] = '2'; +// } +// addAMQPConnector(brokerService); +// brokerService.start(); +// +// AMQP publisher = createAMQPConnection(); +// BlockingConnection pubConnection = publisher.blockingConnection(); +// +// pubConnection.connect(); +// +// AMQP subscriber = createAMQPConnection(); +// BlockingConnection subConnection = subscriber.blockingConnection(); +// +// subConnection.connect(); +// +// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)}; +// subConnection.subscribe(topics); +// for (int i = 0; i < 10; i++) { +// pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false); +// Message message = subConnection.receive(); +// message.ack(); +// assertArrayEquals(payload, message.getPayload()); +// } +// subConnection.disconnect(); +// pubConnection.disconnect(); +// } +// +// +// @Test +// public void testSendAMQPReceiveJMS() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); +// brokerService.start(); +// AMQP amqp = createAMQPConnection(); +// BlockingConnection connection = amqp.blockingConnection(); +// final String DESTINATION_NAME = "foo.*"; +// connection.connect(); +// +// ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection(); +// activeMQConnection.start(); +// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); +// javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME); +// MessageConsumer consumer = s.createConsumer(jmsTopic); +// +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "Test Message: " + i; +// connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +// ActiveMQMessage message = (ActiveMQMessage) consumer.receive(); +// ByteSequence bs = message.getContent(); +// assertEquals(payload, new String(bs.data, bs.offset, bs.length)); +// } +// +// +// activeMQConnection.close(); +// connection.disconnect(); +// } +// +// @Test +// public void testSendJMSReceiveAMQP() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); +// brokerService.start(); +// AMQP amqp = createAMQPConnection(); +// amqp.setKeepAlive(Short.MAX_VALUE); +// BlockingConnection connection = amqp.blockingConnection(); +// connection.connect(); +// +// ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection(); +// activeMQConnection.start(); +// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); +// javax.jms.Topic jmsTopic = s.createTopic("foo.far"); +// MessageProducer producer = s.createProducer(jmsTopic); +// +// Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)}; +// connection.subscribe(topics); +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "This is Test Message: " + i; +// TextMessage sendMessage = s.createTextMessage(payload); +// producer.send(sendMessage); +// Message message = connection.receive(); +// message.ack(); +// assertEquals(payload, new String(message.getPayload())); +// } +// connection.disconnect(); +// } +// +// + + +} \ No newline at end of file diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java new file mode 100644 index 0000000000..ce089d9294 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import com.swiftmq.amqp.AMQPContext; +import com.swiftmq.amqp.v100.client.*; +import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue; +import com.swiftmq.amqp.v100.messaging.AMQPMessage; +import com.swiftmq.amqp.v100.types.AMQPString; +import com.swiftmq.amqp.v100.types.AMQPType; +import org.junit.Test; + +/** + * @author Hiram Chirino + */ +public class SwiftMQClientTest extends AmqpTest { + + @Test + public void testSendReceive() throws Exception { + + String queue = "testqueue"; + int nMsgs = 1; + int qos = QoS.AT_MOST_ONCE; + AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT); + + try { + + Connection connection = new Connection(ctx, "127.0.0.1", port, false); + connection.setContainerId("client"); + connection.setIdleTimeout(-1); + connection.setMaxFrameSize(1024 * 4); + connection.setExceptionListener(new ExceptionListener() { + public void onException(Exception e) { + e.printStackTrace(); + } + }); + connection.connect(); + { + String data = String.format("%010d", 0); + + Session session = connection.createSession(10, 10); + Producer p = session.createProducer(queue, qos); + for (int i = 0; i < nMsgs; i++) { + AMQPMessage msg = new AMQPMessage(); + String s = "Message #" + (i + 1); + System.out.println("Sending " + s); + msg.setAmqpValue(new AmqpValue(new AMQPString(s + ", data: " + data))); + p.send(msg); + } + p.close(); + session.close(); + } +// { +// Session session = connection.createSession(10, 10); +// Consumer c = session.createConsumer(queue, 100, qos, true, null); +// +// // Receive messages non-transacted +// for (int i = 0; i < nMsgs; i++) { +// AMQPMessage msg = c.receive(); +// final AMQPType value = msg.getAmqpValue().getValue(); +// if (value instanceof AMQPString) { +// AMQPString s = (AMQPString) value; +// System.out.println("Received: " + s.getValue()); +// } +// if (!msg.isSettled()) +// msg.accept(); +// } +// c.close(); +// session.close(); +// } + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + +} diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties new file mode 100755 index 0000000000..fd5a31bd24 --- /dev/null +++ b/activemq-amqp/src/test/resources/log4j.properties @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# The logging properties used during tests.. +# +log4j.rootLogger=WARN, console, file +log4j.logger.org.apache.activemq=INFO +log4j.logger.org.fusesource=INFO + +# Console will only display warnnings +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n +log4j.appender.console.threshold=TRACE + +# File appender will contain all info messages +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n +log4j.appender.file.file=target/test.log +log4j.appender.file.append=true