diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java index d393aa7344..ee0dc78e00 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java @@ -30,7 +30,7 @@ import org.fusesource.hawtbuf.Buffer; public class AmqpNioSslTransport extends NIOSSLTransport { - private boolean magicRead; + private final ByteBuffer magic = ByteBuffer.allocate(8); public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -56,17 +56,21 @@ public class AmqpNioSslTransport extends NIOSSLTransport { ByteBuffer payload = ByteBuffer.wrap(fill); - if (!magicRead) { - if (payload.remaining() >= 8) { - magicRead = true; - Buffer magic = new Buffer(8); - for (int i = 0; i < 8; i++) { - magic.data[i] = payload.get(); - } - doConsume(new AmqpHeader(magic)); + if (magic.position() != 8) { + + while (payload.hasRemaining() && magic.position() < 8) { + magic.put(payload.get()); + } + + if (!magic.hasRemaining()) { + magic.flip(); + doConsume(new AmqpHeader(new Buffer(magic))); + magic.position(8); } } - doConsume(AmqpSupport.toBuffer(payload)); + if (payload.hasRemaining()) { + doConsume(AmqpSupport.toBuffer(payload)); + } } } \ No newline at end of file diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQNIOPlusSSLAdmin.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQNIOPlusSSLAdmin.java new file mode 100644 index 0000000000..83535f698f --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQNIOPlusSSLAdmin.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.joram; + +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.spring.SpringSslContext; +import org.apache.activemq.transport.amqp.DefaultTrustManager; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.ConnectionFactory; +import javax.naming.NamingException; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import java.io.File; +import java.security.SecureRandom; + +public class ActiveMQNIOPlusSSLAdmin extends ActiveMQAdmin { + + private static final String AMQP_NIO_PLUS_SSL_URL = "amqp+nio+ssl://localhost:0"; + protected static final Logger LOG = LoggerFactory.getLogger(ActiveMQNIOPlusSSLAdmin.class); + + @Override + public void startServer() throws Exception { + SSLContext ctx = SSLContext.getInstance("TLS"); + ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); + SSLContext.setDefault(ctx); + + // Setup SSL context... + final File classesDir = new File(ActiveMQNIOPlusSSLAdmin.class.getProtectionDomain().getCodeSource().getLocation().getFile()); + File keystore = new File(classesDir, "../../src/test/resources/keystore"); + final SpringSslContext sslContext = new SpringSslContext(); + sslContext.setKeyStore(keystore.getCanonicalPath()); + sslContext.setKeyStorePassword("password"); + sslContext.setTrustStore(keystore.getCanonicalPath()); + sslContext.setTrustStorePassword("password"); + sslContext.afterPropertiesSet(); + + if (broker != null) { + stopServer(); + } + if (System.getProperty("basedir") == null) { + File file = new File("."); + System.setProperty("basedir", file.getAbsolutePath()); + } + broker = createBroker(); + broker.setSslContext(sslContext); + + String connectorURI = getConnectorURI(); + TransportConnector connector = broker.addConnector(connectorURI); + port = connector.getConnectUri().getPort(); + LOG.info("nio+ssl port is {}", port); + + broker.start(); + } + + @Override + protected String getConnectorURI() { + return AMQP_NIO_PLUS_SSL_URL; + } + + @Override + public void createConnectionFactory(String name) { + try { + LOG.debug("Creating a connection factory using port {}", port); + final ConnectionFactory factory = new ConnectionFactoryImpl("localhost", port, null, null, null, true); + context.bind(name, factory); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java new file mode 100644 index 0000000000..f3d0c5eced --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.joram; + +import java.security.SecureRandom; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; + +import org.apache.activemq.transport.amqp.DefaultTrustManager; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.objectweb.jtests.jms.conform.connection.ConnectionTest; +import org.objectweb.jtests.jms.conform.message.MessageBodyTest; +import org.objectweb.jtests.jms.conform.message.MessageDefaultTest; +import org.objectweb.jtests.jms.conform.message.MessageTypeTest; +import org.objectweb.jtests.jms.conform.message.headers.MessageHeaderTest; +import org.objectweb.jtests.jms.conform.message.properties.JMSXPropertyTest; +import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyConversionTest; +import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyTest; +import org.objectweb.jtests.jms.conform.queue.QueueBrowserTest; +import org.objectweb.jtests.jms.conform.queue.TemporaryQueueTest; +import org.objectweb.jtests.jms.conform.selector.SelectorSyntaxTest; +import org.objectweb.jtests.jms.conform.selector.SelectorTest; +import org.objectweb.jtests.jms.conform.session.QueueSessionTest; +import org.objectweb.jtests.jms.conform.session.SessionTest; +import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + // TopicSessionTest.class, // Hangs, see https://issues.apache.org/jira/browse/PROTON-154 + MessageHeaderTest.class, + QueueBrowserTest.class, + MessageTypeTest.class, + //,UnifiedSessionTest.class // https://issues.apache.org/jira/browse/AMQ-4375 + TemporaryTopicTest.class, + //,TopicConnectionTest.class // https://issues.apache.org/jira/browse/AMQ-4654 + SelectorSyntaxTest.class, + QueueSessionTest.class, + SelectorTest.class, + TemporaryQueueTest.class, + ConnectionTest.class, + SessionTest.class, + JMSXPropertyTest.class, + MessageBodyTest.class, + MessageDefaultTest.class, + MessagePropertyConversionTest.class, + MessagePropertyTest.class +}) + +public class JoramJmsNioPlusSslTest { + + protected static final Logger LOG = LoggerFactory.getLogger(JoramJmsNioPlusSslTest.class); + + @Rule + public Timeout to = new Timeout(10 * 1000); + + @BeforeClass + public static void beforeClass() throws Exception { + System.setProperty("joram.jms.test.file", getJmsTestFileName()); + + SSLContext ctx = SSLContext.getInstance("TLS"); + ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); + SSLContext.setDefault(ctx); + } + + public static String getJmsTestFileName() { + return "providerNIOPlusSSL.properties"; + } +} diff --git a/activemq-amqp/src/test/resources/providerNIOPlusSSL.properties b/activemq-amqp/src/test/resources/providerNIOPlusSSL.properties new file mode 100644 index 0000000000..877525f9e8 --- /dev/null +++ b/activemq-amqp/src/test/resources/providerNIOPlusSSL.properties @@ -0,0 +1,20 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# This config file is used by the amqp_nio version of the joram jms tests. +# +jms.provider.admin.class=org.apache.activemq.transport.amqp.joram.ActiveMQSSLAdmin \ No newline at end of file