mirror of https://github.com/apache/activemq.git
Allow the transport to reassemble OpenWire commands from multiple chunks of varying size which can occur depending on the cipher suite. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1402317 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
556ee7b48e
commit
838206ff92
|
@ -118,7 +118,6 @@ public class NIOSSLTransport extends NIOTransport {
|
|||
|
||||
inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
|
||||
inputBuffer.clear();
|
||||
currentBuffer = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
|
||||
|
||||
NIOOutputStream outputStream = new NIOOutputStream(channel);
|
||||
outputStream.setEngine(sslEngine);
|
||||
|
@ -171,11 +170,6 @@ public class NIOSSLTransport extends NIOTransport {
|
|||
while (true) {
|
||||
if (!plain.hasRemaining()) {
|
||||
|
||||
if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
|
||||
plain.clear();
|
||||
} else {
|
||||
plain.compact();
|
||||
}
|
||||
int readCount = secureRead(plain);
|
||||
|
||||
if (readCount == 0) {
|
||||
|
@ -204,17 +198,66 @@ public class NIOSSLTransport extends NIOTransport {
|
|||
}
|
||||
|
||||
protected void processCommand(ByteBuffer plain) throws Exception {
|
||||
nextFrameSize = plain.getInt();
|
||||
if (wireFormat instanceof OpenWireFormat) {
|
||||
long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
|
||||
if (nextFrameSize > maxFrameSize) {
|
||||
throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) +
|
||||
" MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
|
||||
|
||||
// Are we waiting for the next Command or are we building on the current one
|
||||
if (nextFrameSize == -1) {
|
||||
|
||||
// We can get small packets that don't give us enough for the frame size
|
||||
// so allocate enough for the initial size value and
|
||||
if (plain.remaining() < Integer.SIZE) {
|
||||
if (currentBuffer == null) {
|
||||
currentBuffer = ByteBuffer.allocate(4);
|
||||
}
|
||||
|
||||
// Go until we fill the integer sized current buffer.
|
||||
while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
|
||||
currentBuffer.put(plain.get());
|
||||
}
|
||||
|
||||
// Didn't we get enough yet to figure out next frame size.
|
||||
if (currentBuffer.hasRemaining()) {
|
||||
return;
|
||||
} else {
|
||||
currentBuffer.flip();
|
||||
nextFrameSize = currentBuffer.getInt();
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
// Either we are completing a previous read of the next frame size or its
|
||||
// fully contained in plain already.
|
||||
if (currentBuffer != null) {
|
||||
|
||||
// Finish the frame size integer read and get from the current buffer.
|
||||
while (currentBuffer.hasRemaining()) {
|
||||
currentBuffer.put(plain.get());
|
||||
}
|
||||
|
||||
currentBuffer.flip();
|
||||
nextFrameSize = currentBuffer.getInt();
|
||||
|
||||
} else {
|
||||
nextFrameSize = plain.getInt();
|
||||
}
|
||||
}
|
||||
}
|
||||
currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
|
||||
currentBuffer.putInt(nextFrameSize);
|
||||
if (currentBuffer.hasRemaining()) {
|
||||
|
||||
if (wireFormat instanceof OpenWireFormat) {
|
||||
long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
|
||||
if (nextFrameSize > maxFrameSize) {
|
||||
throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) +
|
||||
" MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
|
||||
}
|
||||
}
|
||||
|
||||
// now we got the data, lets reallocate and store the size for the marshaler.
|
||||
// if there's more data in plain, then the next call will start processing it.
|
||||
currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
|
||||
currentBuffer.putInt(nextFrameSize);
|
||||
|
||||
} else {
|
||||
|
||||
// If its all in one read then we can just take it all, otherwise take only
|
||||
// the current frame size and the next iteration starts a new command.
|
||||
if (currentBuffer.remaining() >= plain.remaining()) {
|
||||
currentBuffer.put(plain);
|
||||
} else {
|
||||
|
@ -222,15 +265,17 @@ public class NIOSSLTransport extends NIOTransport {
|
|||
plain.get(fill);
|
||||
currentBuffer.put(fill);
|
||||
}
|
||||
}
|
||||
|
||||
if (currentBuffer.hasRemaining()) {
|
||||
return;
|
||||
} else {
|
||||
currentBuffer.flip();
|
||||
Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
|
||||
doConsume((Command) command);
|
||||
nextFrameSize = -1;
|
||||
// Either we have enough data for a new command or we have to wait for some more.
|
||||
if (currentBuffer.hasRemaining()) {
|
||||
return;
|
||||
} else {
|
||||
currentBuffer.flip();
|
||||
Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
|
||||
doConsume((Command) command);
|
||||
nextFrameSize = -1;
|
||||
currentBuffer = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -239,6 +284,10 @@ public class NIOSSLTransport extends NIOTransport {
|
|||
if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining()) || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
|
||||
int bytesRead = channel.read(inputBuffer);
|
||||
|
||||
if (bytesRead == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (bytesRead == -1) {
|
||||
sslEngine.closeInbound();
|
||||
if (inputBuffer.position() == 0 || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class NIOSSLBasicTest {
|
||||
|
||||
public static final String KEYSTORE_TYPE = "jks";
|
||||
public static final String PASSWORD = "password";
|
||||
public static final String SERVER_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
|
||||
public static final String TRUST_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
|
||||
|
||||
public static final int MESSAGE_COUNT = 1000;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
|
||||
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
|
||||
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
|
||||
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
|
||||
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
|
||||
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
|
||||
// Choose a value that's informative: ssl,handshake,data,trustmanager or all
|
||||
//System.setProperty("javax.net.debug", "handshake");
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws Exception {
|
||||
}
|
||||
|
||||
public BrokerService createBroker(String connectorName, String connectorString) throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setPersistent(false);
|
||||
broker.setUseJmx(false);
|
||||
TransportConnector connector = broker.addConnector(connectorString);
|
||||
connector.setName(connectorName);
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
return broker;
|
||||
}
|
||||
|
||||
public void stopBroker(BrokerService broker) throws Exception {
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void basicConnector() throws Exception {
|
||||
BrokerService broker = createBroker("nio+ssl", "nio+ssl://localhost:0?transport.needClientAuth=true");
|
||||
basicSendReceive("ssl://localhost:" + broker.getConnectorByName("nio+ssl").getConnectUri().getPort());
|
||||
stopBroker(broker);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void enabledCipherSuites() throws Exception {
|
||||
BrokerService broker = createBroker("nio+ssl", "nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA");
|
||||
basicSendReceive("ssl://localhost:" + broker.getConnectorByName("nio+ssl").getConnectUri().getPort());
|
||||
stopBroker(broker);
|
||||
}
|
||||
|
||||
public void basicSendReceive(String uri) throws Exception {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
connection.start();
|
||||
|
||||
String body = "hello world!";
|
||||
Queue destination = session.createQueue("TEST");
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
producer.send(session.createTextMessage(body));
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
Message received = consumer.receive(2000);
|
||||
TestCase.assertEquals(body, ((TextMessage)received).getText());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue