ARTEMIS-1052 Proper Expiry over AMQP

This commit is contained in:
Clebert Suconic 2017-03-20 12:24:42 -04:00
parent 7ac27df7a0
commit 65ac7f700b
17 changed files with 359 additions and 150 deletions

View File

@ -364,23 +364,30 @@ public interface Message {
} }
setBuffer(null); setBuffer(null);
} }
default void reencode() {
// only valid probably on AMQP
}
default void referenceOriginalMessage(final Message original, String originalQueue) { default void referenceOriginalMessage(final Message original, String originalQueue) {
String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString()); String queueOnMessage = original.getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
if (queueOnMessage != null) { if (queueOnMessage != null) {
putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage); setAnnotation(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
} else if (originalQueue != null) { } else if (originalQueue != null) {
putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue); setAnnotation(Message.HDR_ORIGINAL_QUEUE, originalQueue);
} }
if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) { Object originalID = original.getAnnotation(Message.HDR_ORIG_MESSAGE_ID);
putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()));
putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString())); if (originalID != null) {
setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAnnotationString(Message.HDR_ORIGINAL_ADDRESS));
setAnnotation(Message.HDR_ORIG_MESSAGE_ID, originalID);
} else { } else {
putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress()); setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID()); setAnnotation(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
} }
// reset expiry // reset expiry
@ -503,9 +510,26 @@ public interface Message {
Object getObjectProperty(SimpleString key); Object getObjectProperty(SimpleString key);
Object removeDeliveryAnnotationProperty(SimpleString key); default Object removeAnnotation(SimpleString key) {
return removeProperty(key);
}
Object getDeliveryAnnotationProperty(SimpleString key); default String getAnnotationString(SimpleString key) {
Object value = getAnnotation(key);
if (value != null) {
return value.toString();
} else {
return null;
}
}
Object getAnnotation(SimpleString key);
/** Callers must call {@link #reencode()} in order to be sent to clients */
default Message setAnnotation(SimpleString key, Object value) {
putObjectProperty(key, value);
return this;
}
Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException; Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;

View File

@ -98,13 +98,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
/** On core there's no delivery annotation */ /** On core there's no delivery annotation */
@Override @Override
public Object getDeliveryAnnotationProperty(SimpleString key) { public Object getAnnotation(SimpleString key) {
return getObjectProperty(key); return getObjectProperty(key);
} }
/** On core there's no delivery annotation */ /** On core there's no delivery annotation */
@Override @Override
public Object removeDeliveryAnnotationProperty(SimpleString key) { public Object removeAnnotation(SimpleString key) {
return removeProperty(key); return removeProperty(key);
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -62,14 +63,14 @@ public class AMQPMessage extends RefCountMessage {
final long messageFormat; final long messageFormat;
ByteBuf data; ByteBuf data;
boolean bufferValid; boolean bufferValid;
byte type; boolean durable;
long messageID; long messageID;
String address; String address;
MessageImpl protonMessage; MessageImpl protonMessage;
private volatile int memoryEstimate = -1; private volatile int memoryEstimate = -1;
private long expiration = 0; private long expiration = 0;
// this is to store where to start sending bytes, ignoring header and delivery annotations. // this is to store where to start sending bytes, ignoring header and delivery annotations.
private int sendFrom = -1; private int sendFrom = 0;
private boolean parsedHeaders = false; private boolean parsedHeaders = false;
private Header _header; private Header _header;
private DeliveryAnnotations _deliveryAnnotations; private DeliveryAnnotations _deliveryAnnotations;
@ -123,7 +124,7 @@ public class AMQPMessage extends RefCountMessage {
private void initalizeObjects() { private void initalizeObjects() {
if (protonMessage == null) { if (protonMessage == null) {
if (data == null) { if (data == null) {
this.sendFrom = -1; this.sendFrom = 0;
_header = new Header(); _header = new Header();
_deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
_properties = new Properties(); _properties = new Properties();
@ -220,12 +221,27 @@ public class AMQPMessage extends RefCountMessage {
return null; return null;
} }
private Object removeSymbol(Symbol symbol) {
MessageAnnotations annotations = getMessageAnnotations();
Map mapAnnotations = annotations != null ? annotations.getValue() : null;
if (mapAnnotations != null) {
return mapAnnotations.remove(symbol);
}
return null;
}
private void setSymbol(String symbol, Object value) { private void setSymbol(String symbol, Object value) {
setSymbol(Symbol.getSymbol(symbol), value); setSymbol(Symbol.getSymbol(symbol), value);
} }
private void setSymbol(Symbol symbol, Object value) { private void setSymbol(Symbol symbol, Object value) {
MessageAnnotations annotations = getMessageAnnotations(); MessageAnnotations annotations = getMessageAnnotations();
if (annotations == null) {
_messageAnnotations = new MessageAnnotations(new HashMap<>());
annotations = _messageAnnotations;
}
Map mapAnnotations = annotations != null ? annotations.getValue() : null; Map mapAnnotations = annotations != null ? annotations.getValue() : null;
if (mapAnnotations != null) { if (mapAnnotations != null) {
mapAnnotations.put(symbol, value); mapAnnotations.put(symbol, value);
@ -408,7 +424,14 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public org.apache.activemq.artemis.api.core.Message copy() { public org.apache.activemq.artemis.api.core.Message copy() {
checkBuffer(); checkBuffer();
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array());
byte[] origin = data.array();
byte[] newData = new byte[data.array().length - sendFrom];
for (int i = 0; i < newData.length; i++) {
newData[i] = origin[i + sendFrom];
}
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
newEncode.setDurable(isDurable());
return newEncode; return newEncode;
} }
@ -436,6 +459,16 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public AMQPMessage setExpiration(long expiration) { public AMQPMessage setExpiration(long expiration) {
Properties properties = getProperties();
if (properties != null) {
if (expiration <= 0) {
properties.setAbsoluteExpiryTime(null);
} else {
properties.setAbsoluteExpiryTime(new Date(expiration));
}
}
this.expiration = expiration; this.expiration = expiration;
return this; return this;
} }
@ -460,7 +493,7 @@ public class AMQPMessage extends RefCountMessage {
if (getHeader() != null && getHeader().getDurable() != null) { if (getHeader() != null && getHeader().getDurable() != null) {
return getHeader().getDurable().booleanValue(); return getHeader().getDurable().booleanValue();
} else { } else {
return false; return durable;
} }
} }
@ -471,7 +504,8 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) { public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
return null; this.durable = durable;
return this;
} }
@Override @Override
@ -543,12 +577,20 @@ public class AMQPMessage extends RefCountMessage {
} }
} }
@Override
public int getEncodeSize() {
checkBuffer();
// + 20checkBuffer is an estimate for the Header with the deliveryCount
return data.array().length - sendFrom + 20;
}
@Override @Override
public void sendBuffer(ByteBuf buffer, int deliveryCount) { public void sendBuffer(ByteBuf buffer, int deliveryCount) {
checkBuffer(); checkBuffer();
Header header = getHeader(); Header header = getHeader();
if (header == null && deliveryCount > 0) { if (header == null && deliveryCount > 0) {
header = new Header(); header = new Header();
header.setDurable(durable);
} }
if (header != null) { if (header != null) {
synchronized (header) { synchronized (header) {
@ -756,19 +798,36 @@ public class AMQPMessage extends RefCountMessage {
} }
@Override @Override
public Object removeDeliveryAnnotationProperty(SimpleString key) { public Object removeAnnotation(SimpleString key) {
parseHeaders(); return removeSymbol(Symbol.getSymbol(key.toString()));
if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) {
return null;
}
return _deliveryAnnotations.getValue().remove(key.toString());
} }
@Override @Override
public Object getDeliveryAnnotationProperty(SimpleString key) { public Object getAnnotation(SimpleString key) {
return null; return getSymbol(key.toString());
} }
@Override
public AMQPMessage setAnnotation(SimpleString key, Object value) {
setSymbol(key.toString(), value);
return this;
}
@Override
public void reencode() {
parseHeaders();
ApplicationProperties properties = getApplicationProperties();
getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
getProtonMessage().setMessageAnnotations(_messageAnnotations);
getProtonMessage().setApplicationProperties(properties);
getProtonMessage().setProperties(this._properties);
bufferValid = false;
checkBuffer();
}
@Override @Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key)); return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
@ -849,11 +908,6 @@ public class AMQPMessage extends RefCountMessage {
return putStringProperty(key.toString(), value.toString()); return putStringProperty(key.toString(), value.toString());
} }
@Override
public int getEncodeSize() {
return 0;
}
@Override @Override
public Set<SimpleString> getPropertyNames() { public Set<SimpleString> getPropertyNames() {
HashSet<SimpleString> values = new HashSet<>(); HashSet<SimpleString> values = new HashSet<>();
@ -901,15 +955,18 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public int getPersistSize() { public int getPersistSize() {
checkBuffer(); return DataConstants.SIZE_INT + internalPersistSize();
return data.array().length + DataConstants.SIZE_INT; }
private int internalPersistSize() {
return data.array().length - sendFrom;
} }
@Override @Override
public void persist(ActiveMQBuffer targetRecord) { public void persist(ActiveMQBuffer targetRecord) {
checkBuffer(); checkBuffer();
targetRecord.writeInt(data.array().length); targetRecord.writeInt(internalPersistSize());
targetRecord.writeBytes(data.array()); targetRecord.writeBytes(data.array(), sendFrom, data.array().length - sendFrom);
} }
@Override @Override
@ -917,8 +974,10 @@ public class AMQPMessage extends RefCountMessage {
int size = record.readInt(); int size = record.readInt();
byte[] recordArray = new byte[size]; byte[] recordArray = new byte[size];
record.readBytes(recordArray); record.readBytes(recordArray);
this.sendFrom = 0; // whatever was persisted will be sent
this.data = Unpooled.wrappedBuffer(recordArray); this.data = Unpooled.wrappedBuffer(recordArray);
this.bufferValid = true; this.bufferValid = true;
this.durable = true; // it's coming from the journal, so it's durable
parseHeaders(); parseHeaders();
} }
} }

View File

@ -594,7 +594,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// we only need a tag if we are going to settle later // we only need a tag if we are going to settle later
byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(message.getEncodeSize());
try { try {
message.sendBuffer(nettyBuffer, deliveryCount); message.sendBuffer(nettyBuffer, deliveryCount);

View File

@ -58,12 +58,12 @@ public class OpenwireMessage implements Message {
} }
@Override @Override
public Object removeDeliveryAnnotationProperty(SimpleString key) { public Object removeAnnotation(SimpleString key) {
return null; return null;
} }
@Override @Override
public Object getDeliveryAnnotationProperty(SimpleString key) { public Object getAnnotation(SimpleString key) {
return null; return null;
} }

View File

@ -242,7 +242,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
LargeServerMessageImpl otherLM = (LargeServerMessageImpl) original; LargeServerMessageImpl otherLM = (LargeServerMessageImpl) original;
this.paged = otherLM.paged; this.paged = otherLM.paged;
if (this.paged) { if (this.paged) {
this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); this.removeAnnotation(Message.HDR_ORIG_MESSAGE_ID);
} }
} }
} }

View File

@ -238,7 +238,7 @@ public final class BindingsImpl implements Bindings {
/* This is a special treatment for scaled-down messages involving SnF queues. /* This is a special treatment for scaled-down messages involving SnF queues.
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
*/ */
byte[] ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_SCALEDOWN_TO_IDS); byte[] ids = (byte[]) message.removeAnnotation(Message.HDR_SCALEDOWN_TO_IDS);
if (ids != null) { if (ids != null) {
ByteBuffer buffer = ByteBuffer.wrap(ids); ByteBuffer buffer = ByteBuffer.wrap(ids);
@ -268,7 +268,7 @@ public final class BindingsImpl implements Bindings {
if (!routed) { if (!routed) {
// Remove the ids now, in order to avoid double check // Remove the ids now, in order to avoid double check
ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_ROUTE_TO_IDS); ids = (byte[]) message.removeAnnotation(Message.HDR_ROUTE_TO_IDS);
// Fetch the groupId now, in order to avoid double checking // Fetch the groupId now, in order to avoid double checking
SimpleString groupId = message.getGroupID(); SimpleString groupId = message.getGroupID();

View File

@ -765,6 +765,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress); message.setAddress(dlaAddress);
message.reencode();
route(message, context.getTransaction(), false); route(message, context.getTransaction(), false);
result = RoutingStatus.NO_BINDINGS_DLA; result = RoutingStatus.NO_BINDINGS_DLA;
} }
@ -1221,7 +1223,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
AtomicBoolean startedTX) throws Exception { AtomicBoolean startedTX) throws Exception {
// Check the DuplicateCache for the Bridge first // Check the DuplicateCache for the Bridge first
Object bridgeDup = message.removeDeliveryAnnotationProperty(Message.HDR_BRIDGE_DUPLICATE_ID); Object bridgeDup = message.removeAnnotation(Message.HDR_BRIDGE_DUPLICATE_ID);
if (bridgeDup != null) { if (bridgeDup != null) {
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
byte[] bridgeDupBytes = (byte[]) bridgeDup; byte[] bridgeDupBytes = (byte[]) bridgeDup;

View File

@ -104,6 +104,8 @@ public class DivertImpl implements Divert {
copy.setExpiration(message.getExpiration()); copy.setExpiration(message.getExpiration());
copy.reencode();
switch (routingType) { switch (routingType) {
case ANYCAST: case ANYCAST:
copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType()); copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());

View File

@ -1718,14 +1718,14 @@ public class QueueImpl implements Queue {
@Override @Override
public int retryMessages(Filter filter) throws Exception { public int retryMessages(Filter filter) throws Exception {
final HashMap<SimpleString, Long> queues = new HashMap<>(); final HashMap<String, Long> queues = new HashMap<>();
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override @Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception { public void actMessage(Transaction tx, MessageReference ref) throws Exception {
SimpleString originalMessageAddress = ref.getMessage().getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS); String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS);
SimpleString originalMessageQueue = ref.getMessage().getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE); String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
if (originalMessageAddress != null) { if (originalMessageAddress != null) {
@ -1735,7 +1735,7 @@ public class QueueImpl implements Queue {
if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) { if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
targetQueue = queues.get(originalMessageQueue); targetQueue = queues.get(originalMessageQueue);
if (targetQueue == null) { if (targetQueue == null) {
Binding binding = postOffice.getBinding(originalMessageQueue); Binding binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
if (binding != null && binding instanceof LocalQueueBinding) { if (binding != null && binding instanceof LocalQueueBinding) {
targetQueue = ((LocalQueueBinding) binding).getID(); targetQueue = ((LocalQueueBinding) binding).getID();
@ -1745,9 +1745,9 @@ public class QueueImpl implements Queue {
} }
if (targetQueue != null) { if (targetQueue != null) {
move(originalMessageAddress, tx, ref, false, false, targetQueue.longValue()); move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
} else { } else {
move(originalMessageAddress, tx, ref, false, false); move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
} }
@ -2495,10 +2495,14 @@ public class QueueImpl implements Queue {
copy.referenceOriginalMessage(message, ref != null ? ref.getQueue().getName().toString() : null); copy.referenceOriginalMessage(message, ref != null ? ref.getQueue().getName().toString() : null);
} }
copy.setExpiration(0);
if (expiry) { if (expiry) {
copy.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME.toString(), System.currentTimeMillis()); copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
} }
copy.reencode();
return copy; return copy;
} }

View File

@ -299,12 +299,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
} }
@Override @Override
public Object removeDeliveryAnnotationProperty(SimpleString key) { public Object removeAnnotation(SimpleString key) {
return null; return null;
} }
@Override @Override
public Object getDeliveryAnnotationProperty(SimpleString key) { public Object getAnnotation(SimpleString key) {
return null; return null;
} }

View File

@ -471,7 +471,7 @@ public class AmqpMessage {
/** /**
* Sets the creation time property on the message. * Sets the creation time property on the message.
* *
* @param absoluteExpiryTime the expiration time value to set. * @param creationTime the time value to set.
*/ */
public void setCreationTime(long creationTime) { public void setCreationTime(long creationTime) {
checkReadOnly(); checkReadOnly();

View File

@ -17,8 +17,6 @@
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -32,7 +30,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -46,23 +43,14 @@ import org.junit.Before;
* Test support class for tests that will be using the AMQP Proton wrapper client. * Test support class for tests that will be using the AMQP Proton wrapper client.
* This is to make it easier to migrate tests from ActiveMQ5 * This is to make it easier to migrate tests from ActiveMQ5
*/ */
public class AmqpClientTestSupport extends ActiveMQTestBase { public class AmqpClientTestSupport extends AmqpTestSupport {
protected static Symbol SHARED = Symbol.getSymbol("shared"); protected static Symbol SHARED = Symbol.getSymbol("shared");
protected static Symbol GLOBAL = Symbol.getSymbol("global"); protected static Symbol GLOBAL = Symbol.getSymbol("global");
private boolean useSSL;
protected JMSServerManager serverManager; protected JMSServerManager serverManager;
protected ActiveMQServer server; protected ActiveMQServer server;
protected LinkedList<AmqpConnection> connections = new LinkedList<>();
protected AmqpConnection addConnection(AmqpConnection connection) {
connections.add(connection);
return connection;
}
@Before @Before
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
@ -80,6 +68,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
ignored.printStackTrace(); ignored.printStackTrace();
} }
} }
connections.clear();
if (serverManager != null) { if (serverManager != null) {
try { try {
@ -149,79 +138,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
this.useSSL = useSSL; this.useSSL = useSSL;
} }
public boolean isUseSSL() {
return useSSL;
}
public String getAmqpConnectionURIOptions() {
return "";
}
public URI getBrokerAmqpConnectionURI() {
boolean webSocket = false;
try {
int port = 61616;
String uri = null;
if (isUseSSL()) {
if (webSocket) {
uri = "wss://127.0.0.1:" + port;
} else {
uri = "ssl://127.0.0.1:" + port;
}
} else {
if (webSocket) {
uri = "ws://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
}
if (!getAmqpConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getAmqpConnectionURIOptions();
}
return new URI(uri);
} catch (Exception e) {
throw new RuntimeException();
}
}
public AmqpConnection createAmqpConnection() throws Exception {
return createAmqpConnection(getBrokerAmqpConnectionURI());
}
public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
}
public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
return createAmqpConnection(brokerURI, null, null);
}
public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
return createAmqpClient(brokerURI, username, password).connect();
}
public AmqpClient createAmqpClient() throws Exception {
return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
}
public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
return createAmqpClient(brokerURI, null, null);
}
public AmqpClient createAmqpClient(String username, String password) throws Exception {
return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
}
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
return new AmqpClient(brokerURI, username, password);
}
protected void sendMessages(int numMessages, String address) throws Exception { protected void sendMessages(int numMessages, String address) throws Exception {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert; import org.junit.Assert;

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.LinkedList;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.junit.After;
/** This will only add methods to support AMQP Testing without creating servers or anything */
public class AmqpTestSupport extends ActiveMQTestBase {
protected LinkedList<AmqpConnection> connections = new LinkedList<>();
protected boolean useSSL;
protected AmqpConnection addConnection(AmqpConnection connection) {
connections.add(connection);
return connection;
}
@After
@Override
public void tearDown() throws Exception {
for (AmqpConnection conn : connections) {
try {
conn.close();
} catch (Throwable ignored) {
ignored.printStackTrace();
}
}
super.tearDown();
}
public boolean isUseSSL() {
return useSSL;
}
public String getAmqpConnectionURIOptions() {
return "";
}
public URI getBrokerAmqpConnectionURI() {
boolean webSocket = false;
try {
int port = 61616;
String uri = null;
if (isUseSSL()) {
if (webSocket) {
uri = "wss://127.0.0.1:" + port;
} else {
uri = "ssl://127.0.0.1:" + port;
}
} else {
if (webSocket) {
uri = "ws://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
}
if (!getAmqpConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getAmqpConnectionURIOptions();
}
return new URI(uri);
} catch (Exception e) {
throw new RuntimeException();
}
}
public AmqpConnection createAmqpConnection() throws Exception {
return createAmqpConnection(getBrokerAmqpConnectionURI());
}
public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
}
public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
return createAmqpConnection(brokerURI, null, null);
}
public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
return createAmqpClient(brokerURI, username, password).connect();
}
public AmqpClient createAmqpClient() throws Exception {
return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
}
public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
return createAmqpClient(brokerURI, null, null);
}
public AmqpClient createAmqpClient(String username, String password) throws Exception {
return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
}
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
return new AmqpClient(brokerURI, username, password);
}
}

View File

@ -26,20 +26,28 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class SendingAndReceivingTest extends ActiveMQTestBase { public class SendingAndReceivingTest extends AmqpTestSupport {
private ActiveMQServer server; private ActiveMQServer server;
@ -55,7 +63,15 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
tc.getExtraParams().put("multicastPrefix", "multicast://"); tc.getExtraParams().put("multicastPrefix", "multicast://");
} }
} }
server.getConfiguration().setMessageExpiryScanPeriod(1);
server.start(); server.start();
server.createQueue(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST, SimpleString.toSimpleString("exampleQueue"), null, true, false, -1, false, true);
server.createQueue(SimpleString.toSimpleString("DLQ"), RoutingType.ANYCAST, SimpleString.toSimpleString("DLQ"), null, true, false, -1, false, true);
AddressSettings as = new AddressSettings();
as.setExpiryAddress(SimpleString.toSimpleString("DLQ"));
HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
repos.addMatch("#", as);
} }
@After @After
@ -112,21 +128,24 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
Queue queue = session.createQueue(address); Queue queue = session.createQueue(address);
MessageProducer sender = session.createProducer(queue); MessageProducer sender = session.createProducer(queue);
sender.setTimeToLive(10); sender.setTimeToLive(1);
Message message = session.createMessage(); Message message = session.createMessage();
sender.send(message); sender.send(message);
connection.start(); connection.start();
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(session.createQueue("DLQ"));
Message m = consumer.receive(5000); Message m = consumer.receive(10000);
Assert.assertNotNull(m);
consumer.close();
consumer = session.createConsumer(queue);
m = consumer.receiveNoWait();
Assert.assertNull(m); Assert.assertNull(m);
consumer.close(); consumer.close();
consumer = session.createConsumer(session.createQueue("DLQ"));
m = consumer.receive(5000);
Assert.assertNotNull(m);
consumer.close();
} finally { } finally {
if (connection != null) { if (connection != null) {
connection.close(); connection.close();
@ -134,6 +153,63 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
} }
} }
@Test(timeout = 60000)
public void testSendExpiry() throws Throwable {
internalSendExpiry(false);
}
@Test(timeout = 60000)
public void testSendExpiryRestartServer() throws Throwable {
internalSendExpiry(true);
}
public void internalSendExpiry(boolean restartServer) throws Throwable {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
try {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("exampleQueue");
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
message.setText("Test-Message");
message.setDeliveryAnnotation("shouldDisappear", 1);
message.setAbsoluteExpiryTime(System.currentTimeMillis() + 1000);
sender.send(message);
org.apache.activemq.artemis.core.server.Queue dlq = server.locateQueue(SimpleString.toSimpleString("DLQ"));
Wait.waitFor(() -> dlq.getMessageCount() > 0, 5000, 500);
connection.close();
if (restartServer) {
server.stop();
server.start();
}
connection = client.connect();
session = connection.createSession();
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver("DLQ");
receiver.flow(20);
message = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertEquals("exampleQueue", message.getMessageAnnotation(org.apache.activemq.artemis.api.core.Message.HDR_ORIGINAL_ADDRESS.toString()));
Assert.assertNull(message.getDeliveryAnnotation("shouldDisappear"));
Assert.assertNull(receiver.receiveNoWait());
} finally {
connection.close();
}
}
private static String createMessage(int messageSize) { private static String createMessage(int messageSize) {
final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
Random rnd = new Random(); Random rnd = new Random();

View File

@ -357,12 +357,12 @@ public class AcknowledgeTest extends ActiveMQTestBase {
} }
@Override @Override
public Object removeDeliveryAnnotationProperty(SimpleString key) { public Object removeAnnotation(SimpleString key) {
return null; return null;
} }
@Override @Override
public Object getDeliveryAnnotationProperty(SimpleString key) { public Object getAnnotation(SimpleString key) {
return null; return null;
} }