Adds some additional logging to the connection validation code, adds
some additional tests as well.
This commit is contained in:
Timothy Bish 2017-04-05 16:20:28 -04:00
parent 548403ad99
commit 0752d840b9
4 changed files with 360 additions and 11 deletions

View File

@ -32,9 +32,13 @@ import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpWireFormat implements WireFormat {
private static final Logger LOG = LoggerFactory.getLogger(AmqpWireFormat.class);
public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
@ -137,18 +141,22 @@ public class AmqpWireFormat implements WireFormat {
*/
public boolean isHeaderValid(AmqpHeader header, boolean authenticated) {
if (!header.hasValidPrefix()) {
LOG.trace("AMQP Header arrived with invalid prefix: {}", header);
return false;
}
if (!(header.getProtocolId() == 0 || header.getProtocolId() == SASL_PROTOCOL)) {
LOG.trace("AMQP Header arrived with invalid protocol ID: {}", header);
return false;
}
if (!authenticated && !isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) {
LOG.trace("AMQP Header arrived without SASL and server requires SASL: {}", header);
return false;
}
if (header.getMajor() != 1 || header.getMinor() != 0 || header.getRevision() != 0) {
LOG.trace("AMQP Header arrived invalid version: {}", header);
return false;
}

View File

@ -72,7 +72,7 @@ public class AmqpTestSupport {
protected ExecutorService testService = Executors.newSingleThreadExecutor();
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected Vector<Throwable> exceptions = new Vector<>();
protected int numberOfMessages;
protected URI amqpURI;
@ -150,7 +150,7 @@ public class AmqpTestSupport {
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
ArrayList<BrokerPlugin> plugins = new ArrayList<>();
addAdditionalPlugins(plugins);
@ -182,28 +182,28 @@ public class AmqpTestSupport {
}
if (isUseTcpConnector()) {
connector = brokerService.addConnector(
"amqp://0.0.0.0:" + amqpPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"amqp://0.0.0.0:" + amqpPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpPort = connector.getConnectUri().getPort();
amqpURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp port " + amqpPort);
}
if (isUseSslConnector()) {
connector = brokerService.addConnector(
"amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpSslPort = connector.getConnectUri().getPort();
amqpSslURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+ssl port " + amqpSslPort);
}
if (isUseNioConnector()) {
connector = brokerService.addConnector(
"amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpNioPort = connector.getConnectUri().getPort();
amqpNioURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+nio port " + amqpNioPort);
}
if (isUseNioPlusSslConnector()) {
connector = brokerService.addConnector(
"amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpNioPlusSslPort = connector.getConnectUri().getPort();
amqpNioPlusSslURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort);
@ -238,14 +238,14 @@ public class AmqpTestSupport {
}
if (isUseWsConnector()) {
connector = brokerService.addConnector(
"ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpWsPort = connector.getConnectUri().getPort();
amqpWsURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+ws port " + amqpWsPort);
}
if (isUseWssConnector()) {
connector = brokerService.addConnector(
"wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpWssPort = connector.getConnectUri().getPort();
amqpWssURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+wss port " + amqpWssPort);

View File

@ -432,6 +432,78 @@ public class AmqpMessage {
return message.getHeader().getDurable();
}
/**
* Sets the priority header on the outgoing message.
*
* @param priority the priority value to set.
*/
public void setPriority(short priority) {
checkReadOnly();
lazyCreateHeader();
getWrappedMessage().setPriority(priority);
}
/**
* Gets the priority header on the message.
*/
public short getPriority() {
return getWrappedMessage().getPriority();
}
/**
* Sets the ttl header on the outgoing message.
*
* @param timeToLive the ttl value to set.
*/
public void setTimeToLive(long timeToLive) {
checkReadOnly();
lazyCreateHeader();
getWrappedMessage().setTtl(timeToLive);
}
/**
* Sets the ttl header on the outgoing message.
*/
public long getTimeToLive() {
return getWrappedMessage().getTtl();
}
/**
* Sets the absolute expiration time property on the message.
*
* @param absoluteExpiryTime the expiration time value to set.
*/
public void setAbsoluteExpiryTime(long absoluteExpiryTime) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setExpiryTime(absoluteExpiryTime);
}
/**
* Gets the absolute expiration time property on the message.
*/
public long getAbsoluteExpiryTime() {
return getWrappedMessage().getExpiryTime();
}
/**
* Sets the creation time property on the message.
*
* @param creationTime the time value to set.
*/
public void setCreationTime(long creationTime) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setCreationTime(creationTime);
}
/**
* Gets the absolute expiration time property on the message.
*/
public long getCreationTime() {
return getWrappedMessage().getCreationTime();
}
/**
* Sets a given application property on an outbound message.
*
@ -615,21 +687,21 @@ public class AmqpMessage {
private void lazyCreateMessageAnnotations() {
if (messageAnnotationsMap == null) {
messageAnnotationsMap = new HashMap<Symbol,Object>();
messageAnnotationsMap = new HashMap<>();
message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
}
}
private void lazyCreateDeliveryAnnotations() {
if (deliveryAnnotationsMap == null) {
deliveryAnnotationsMap = new HashMap<Symbol,Object>();
deliveryAnnotationsMap = new HashMap<>();
message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
}
}
private void lazyCreateApplicationProperties() {
if (applicationPropertiesMap == null) {
applicationPropertiesMap = new HashMap<String, Object>();
applicationPropertiesMap = new HashMap<>();
message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
}
}

View File

@ -0,0 +1,269 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Broker doesn't track messages that arrived already expired.
assertEquals(0, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
// Broker doesn't track messages that arrived already expired.
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(1, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsExiredUsingAbsoluteTimeWithLongTTL() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
// AET should override any TTL set
message.setTimeToLive(60000);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Broker doesn't track messages that arrived already expired.
assertEquals(0, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
// Broker doesn't track messages that arrived already expired.
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsExpiredUsingTTLWhenAbsoluteIsZero() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(0);
// AET should override any TTL set unless it is zero
message.setTimeToLive(1000);
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(1, queueView.getQueueSize());
Thread.sleep(1000);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
assertEquals(1, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsNotExpiredUsingAbsoluteTimeWithElspsedTTL() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
// AET should override any TTL set
message.setTimeToLive(10);
message.setText("Test-Message");
sender.send(message);
sender.close();
Thread.sleep(50);
assertEquals(1, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsNotExpiredUsingTimeToLive() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setTimeToLive(5000);
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(1, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setTimeToLive(10);
message.setText("Test-Message");
sender.send(message);
sender.close();
Thread.sleep(50);
assertEquals(1, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
assertEquals(1, queueView.getExpiredCount());
connection.close();
}
}