Fixes and some testing around maxFrameSize handling on the AMQP
Transport.
This commit is contained in:
Timothy Bish 2015-05-18 14:51:56 -04:00
parent 76b60ce44b
commit 7c41ebc912
9 changed files with 232 additions and 79 deletions

View File

@ -242,4 +242,12 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
public void setConnectAttemptTimeout(int connectAttemptTimeout) {
wireFormat.setConnectAttemptTimeout(connectAttemptTimeout);
}
public long getMaxFrameSize() {
return wireFormat.getMaxFrameSize();
}
public void setMaxFrameSize(long maxFrameSize) {
wireFormat.setMaxFrameSize(maxFrameSize);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp.protocol;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.fusesource.hawtbuf.Buffer;
@ -99,6 +100,10 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
int count;
while ((count = getEndpoint().recv(recvBuffer, 0, recvBuffer.length)) > 0) {
current.write(recvBuffer, 0, count);
if (current.size() > session.getMaxFrameSize()) {
throw new AmqpProtocolException("Frame size of " + current.size() + " larger than max allowed " + session.getMaxFrameSize());
}
}
// Expecting more deliveries..

View File

@ -256,6 +256,13 @@ public class AmqpConnection implements AmqpProtocolConverter {
return connectionInfo.getClientId();
}
/**
* @return the configured max frame size allowed for incoming messages.
*/
public long getMaxFrameSize() {
return amqpWireFormat.getMaxFrameSize();
}
//----- Proton Event handling and IO support -----------------------------//
void pumpProtonToSocket() {
@ -713,14 +720,17 @@ public class AmqpConnection implements AmqpProtocolConverter {
}
void handleException(Throwable exception) {
exception.printStackTrace();
LOG.debug("Exception detail", exception);
try {
// Must ensure that the broker removes Connection resources.
sendToActiveMQ(new ShutdownInfo());
amqpTransport.stop();
} catch (Throwable e) {
LOG.error("Failed to stop AMQP Transport ", e);
if (exception instanceof AmqpProtocolException) {
onAMQPException((IOException) exception);
} else {
try {
// Must ensure that the broker removes Connection resources.
sendToActiveMQ(new ShutdownInfo());
amqpTransport.stop();
} catch (Throwable e) {
LOG.error("Failed to stop AMQP Transport ", e);
}
}
}

View File

@ -369,6 +369,10 @@ public class AmqpSession implements AmqpResource {
return protonSession;
}
public long getMaxFrameSize() {
return connection.getMaxFrameSize();
}
//----- Internal Implementation ------------------------------------------//
private ConsumerId getNextConsumerId() {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.transport.amqp.AmqpTestSupport;
@ -25,13 +26,76 @@ import org.apache.activemq.transport.amqp.AmqpTestSupport;
*/
public class AmqpClientTestSupport extends AmqpTestSupport {
private String connectorScheme = "amqp";
private boolean useSSL;
public AmqpClientTestSupport() {
}
public AmqpClientTestSupport(String connectorScheme, boolean useSSL) {
this.connectorScheme = connectorScheme;
this.useSSL = useSSL;
}
public String getConnectorScheme() {
return connectorScheme;
}
public boolean isUseSSL() {
return useSSL;
}
public String getAmqpConnectionURIOptions() {
return "";
}
@Override
protected boolean isUseTcpConnector() {
return !isUseSSL() && !connectorScheme.contains("nio");
}
@Override
protected boolean isUseSslConnector() {
return isUseSSL() && !connectorScheme.contains("nio");
}
@Override
protected boolean isUseNioConnector() {
return !isUseSSL() && connectorScheme.contains("nio");
}
@Override
protected boolean isUseNioPlusSslConnector() {
return isUseSSL() && connectorScheme.contains("nio");
}
public URI getBrokerAmqpConnectionURI() {
try {
String uri = "tcp://127.0.0.1:" + amqpPort;
int port = 0;
switch (connectorScheme) {
case "amqp":
port = this.amqpPort;
break;
case "amqp+ssl":
port = this.amqpSslPort;
break;
case "amqp+nio":
port = this.amqpNioPort;
break;
case "amqp+nio+ssl":
port = this.amqpNioPlusSslPort;
break;
default:
throw new IOException("Invalid AMQP connector scheme passed to test.");
}
String uri = null;
if (isUseSSL()) {
uri = "ssl://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
if (!getAmqpConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getAmqpConnectionURIOptions();

View File

@ -21,9 +21,11 @@ import java.util.Map;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.engine.Delivery;
@ -315,6 +317,21 @@ public class AmqpMessage {
getWrappedMessage().setBody(body);
}
/**
* Sets a byte array value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
* @param value
* the byte array value to store in the Message body.
*
* @throws IllegalStateException if the message is read only.
*/
public void setBytes(byte[] bytes) throws IllegalStateException {
checkReadOnly();
Data body = new Data(new Binary(bytes));
getWrappedMessage().setBody(body);
}
//----- Internal implementation ------------------------------------------//
private void checkReadOnly() throws IllegalStateException {

View File

@ -20,8 +20,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -43,9 +41,6 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
private static final int MAX_CONNECTIONS = 10;
protected boolean useSSL;
protected String connectorScheme;
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
@ -55,8 +50,7 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
}
public AmqpConfiguredMaxConnectionsTest(String connectorScheme, boolean useSSL) {
this.connectorScheme = connectorScheme;
this.useSSL = useSSL;
super(connectorScheme, useSSL);
}
@Test(timeout = 60000)
@ -92,69 +86,6 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
}
protected String getConnectorScheme() {
return connectorScheme;
}
protected boolean isUseSSL() {
return useSSL;
}
@Override
protected boolean isUseSslConnector() {
return isUseSSL();
}
@Override
protected boolean isUseNioConnector() {
return true;
}
@Override
protected boolean isUseNioPlusSslConnector() {
return isUseSSL();
}
@Override
public URI getBrokerAmqpConnectionURI() {
try {
int port = 0;
switch (connectorScheme) {
case "amqp":
port = this.amqpPort;
break;
case "amqp+ssl":
port = this.amqpSslPort;
break;
case "amqp+nio":
port = this.amqpNioPort;
break;
case "amqp+nio+ssl":
port = this.amqpNioPlusSslPort;
break;
default:
throw new IOException("Invalid AMQP connector scheme passed to test.");
}
String uri = null;
if (isUseSSL()) {
uri = "ssl://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
if (!getAmqpConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getAmqpConnectionURIOptions();
}
return new URI(uri);
} catch (Exception e) {
throw new RuntimeException();
}
}
@Override
protected String getAdditionalConfig() {
return "&maximumConnections=" + MAX_CONNECTIONS;

View File

@ -35,7 +35,7 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
@Override
protected String getAdditionalConfig() {
return "?transport.wireFormat.maxFrameSize=65535";
return "?transport.maxFrameSize=65535&transport.wireFormat.idleTimeout=5000";
}
@Test(timeout = 60000)

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Test that the maxFrameSize setting prevents large frames from being processed.
*/
@RunWith(Parameterized.class)
public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
private final String testName;
private final int maxFrameSize;
private final int maxAmqpFrameSize;
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ "amqp-> MFS > MAFS", "amqp", false, 1024, 2048 },
{ "amqp-> MFS < MAFS", "amqp", false, 2048, 1024 },
{ "amqp+nio-> MFS > MAFS", "amqp+nio", false, 1024, 2048 },
{ "amqp+nio-> MFS < MAFS", "amqp+nio", false, 2048, 1024 },
});
}
public AmqpMaxFrameSizeTest(String testName, String connectorScheme, boolean useSSL, int maxFrameSize, int maxAmqpFrameSize) {
super(connectorScheme, useSSL);
this.testName = testName;
this.maxFrameSize = maxFrameSize;
this.maxAmqpFrameSize = maxAmqpFrameSize;
}
@Override
protected String getAdditionalConfig() {
return "&transport.wireFormat.maxAmqpFrameSize=" + maxAmqpFrameSize +
"&transport.maxFrameSize=" + maxFrameSize;
}
@Test(timeout = 600000)
public void testMaxFrameSizeApplied() throws Exception {
LOG.info("Test starting {} for transport {} with MFS:{} and MAFS:{}",
new Object[]{ testName, getConnectorScheme(), maxFrameSize, maxAmqpFrameSize });
final CountDownLatch failed = new CountDownLatch(1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
connection.setListener(new AmqpConnectionListener() {
@Override
public void onException(Throwable ex) {
failed.countDown();
}
});
connection.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName(), true);
byte[] payload = new byte[maxFrameSize];
for (int i = 0; i < payload.length; ++i) {
payload[i] = 42;
}
AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
sender.send(message);
assertTrue("Connection should have failed", failed.await(10, TimeUnit.SECONDS));
assertNotNull(getProxyToQueue(getTestName()));
assertEquals(0, getProxyToQueue(getTestName()).getQueueSize());
connection.close();
}
}