We now expire messages on the broker.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@475848 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-16 18:24:14 +00:00
parent 93ee7d9f5f
commit a58d36b9b3
6 changed files with 358 additions and 12 deletions

View File

@ -377,6 +377,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
while(pending.hasNext()&&!isFull()){
MessageReference node=pending.next();
pending.remove();
// Message may have been sitting in the pending list a while
// waiting for the consumer to ak the message.
if( node.isExpired() ) {
continue; // just drop it.
}
dispatch(node);
}
}finally{

View File

@ -17,7 +17,14 @@
*/
package org.apache.activemq.broker.region;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -28,7 +35,6 @@ import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
@ -51,14 +57,7 @@ import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
@ -122,7 +121,13 @@ public class Queue implements Destination, Task {
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message){
message.setRegionDestination(Queue.this);
// Message could have expired while it was being loaded..
if( message.isExpired() ) {
// TODO: remove message from store.
return;
}
message.setRegionDestination(Queue.this);
synchronized(messages){
try{
messages.addMessageLast(message);
@ -295,11 +300,23 @@ public class Queue implements Destination, Task {
}
public void send(final ConnectionContext context,final Message message) throws Exception{
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if( message.isExpired() ) {
return;
}
if(context.isProducerFlowControl()){
if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}else{
usageManager.waitForSpace();
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if( message.isExpired() ) {
return;
}
}
}
message.setRegionDestination(this);
@ -310,6 +327,14 @@ public class Queue implements Destination, Task {
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
// It could take while before we receive the commit
// operration.. by that time the message could have expired..
if( message.isExpired() ) {
// TODO: remove message from store.
return;
}
sendMessage(context,message);
}
});

View File

@ -232,11 +232,23 @@ public class Topic implements Destination {
public void send(final ConnectionContext context, final Message message) throws Exception {
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if( message.isExpired() ) {
return;
}
if (context.isProducerFlowControl()) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
} else {
usageManager.waitForSpace();
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if( message.isExpired() ) {
return;
}
}
}
@ -251,6 +263,12 @@ public class Topic implements Destination {
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
// It could take while before we receive the commit
// operration.. by that time the message could have expired..
if( message.isExpired() ) {
// TODO: remove message from store.
return;
}
dispatch(context, message);
}
});

View File

@ -325,6 +325,14 @@ public class TopicSubscription extends AbstractSubscription{
for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
MessageReference message=(MessageReference) iter.next();
iter.remove();
// Message may have been sitting in the matched list a while
// waiting for the consumer to ak the message.
if( message.isExpired() ) {
message.decrementReferenceCount();
continue; // just drop it.
}
dispatch(message);
}
}

View File

@ -409,7 +409,10 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
}
public boolean isExpired() {
// TODO: need to be implemented.
long expireTime = getExpiration();
if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
return true;
}
return false;
}

View File

@ -0,0 +1,285 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import javax.jms.DeliveryMode;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
public class MessageExpirationTest extends BrokerTestSupport {
public ActiveMQDestination destination;
public int deliveryMode;
public int prefetch;
public byte destinationType;
public boolean durableConsumer;
protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode, int timeToLive) {
Message message = createMessage(producerInfo, destination, deliveryMode);
long now = System.currentTimeMillis();
message.setTimestamp(now);
message.setExpiration(now+timeToLive);
return message;
}
public void initCombosForTestMessagesWaitingForUssageDecreaseExpire() {
addCombinationValues( "deliveryMode", new Object[]{
new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)} );
addCombinationValues( "destinationType", new Object[]{
new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE),
new Byte(ActiveMQDestination.QUEUE_TYPE),
new Byte(ActiveMQDestination.TOPIC_TYPE),
} );
}
public void testMessagesWaitingForUssageDecreaseExpire() throws Exception {
// Start a producer
final StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
final ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
// Start a consumer..
final StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
destination = createDestinationInfo(connection2, connectionInfo2, destinationType);
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
consumerInfo2.setPrefetchSize(1);
connection2.send(consumerInfo2);
// Reduce the limit so that only 1 message can flow through the broker at a time.
broker.getMemoryManager().setLimit(1);
final Message m1 = createMessage(producerInfo, destination, deliveryMode);
final Message m2 = createMessage(producerInfo, destination, deliveryMode, 1000);
final Message m3 = createMessage(producerInfo, destination, deliveryMode);
final Message m4 = createMessage(producerInfo, destination, deliveryMode, 1000);
// Produce in an async thread since the producer will be getting blocked by the usage manager..
new Thread() {
public void run() {
// m1 and m3 should not expire.. but the others should.
try {
connection.send(m1);
connection.send(m2);
connection.send(m3);
connection.send(m4);
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
// Make sure only 1 message was delivered due to prefetch == 1
Message m = receiveMessage(connection2);
assertNotNull(m);
assertEquals(m1.getMessageId(), m.getMessageId());
assertNoMessagesLeft(connection);
// Sleep before we ack so that the messages expire on the usage manager
Thread.sleep(1500);
connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE));
// 2nd message received should be m3.. it should have expired 2nd message sent.
m = receiveMessage(connection2);
assertNotNull(m);
assertEquals(m3.getMessageId(), m.getMessageId());
// Sleep before we ack so that the messages expire on the usage manager
Thread.sleep(1500);
connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE));
// And there should be no messages left now..
assertNoMessagesLeft(connection2);
connection.send(closeConnectionInfo(connectionInfo));
connection.send(closeConnectionInfo(connectionInfo2));
}
public void initCombosForTestMessagesInLongTransactionExpire() {
addCombinationValues( "deliveryMode", new Object[]{
new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)} );
addCombinationValues( "destinationType", new Object[]{
new Byte(ActiveMQDestination.QUEUE_TYPE),
new Byte(ActiveMQDestination.TOPIC_TYPE),
new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)
} );
}
public void testMessagesInLongTransactionExpire() throws Exception {
// Start a producer and consumer
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
destination = createDestinationInfo(connection, connectionInfo, destinationType);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setPrefetchSize(1000);
connection.send(consumerInfo);
// Start the tx..
LocalTransactionId txid = createLocalTransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
// m1 and m3 should not expire.. but the others should.
Message m1 = createMessage(producerInfo, destination, deliveryMode);
m1.setTransactionId(txid);
connection.send(m1);
Message m = createMessage(producerInfo, destination, deliveryMode, 1000);
m.setTransactionId(txid);
connection.send(m);
Message m3 = createMessage(producerInfo, destination, deliveryMode);
m3.setTransactionId(txid);
connection.send(m3);
m = createMessage(producerInfo, destination, deliveryMode, 1000);
m.setTransactionId(txid);
connection.send(m);
// Sleep before we commit so that the messages expire on the commit list..
Thread.sleep(1500);
connection.send(createCommitTransaction1Phase(connectionInfo, txid));
m = receiveMessage(connection);
assertNotNull(m);
assertEquals(m1.getMessageId(), m.getMessageId());
connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
// 2nd message received should be m3.. it should have expired 2nd message sent.
m = receiveMessage(connection);
assertNotNull(m);
assertEquals(m3.getMessageId(), m.getMessageId());
connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
// And there should be no messages left now..
assertNoMessagesLeft(connection);
connection.send(closeConnectionInfo(connectionInfo));
}
public void TestMessagesInSubscriptionPendingListExpire() {
addCombinationValues( "deliveryMode", new Object[]{
new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)} );
addCombinationValues( "destinationType", new Object[]{
new Byte(ActiveMQDestination.QUEUE_TYPE),
new Byte(ActiveMQDestination.TOPIC_TYPE),
new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)
} );
}
public void initCombosForTestMessagesInSubscriptionPendingListExpire() {
addCombinationValues( "deliveryMode", new Object[]{
new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)} );
addCombinationValues( "destinationType", new Object[]{
new Byte(ActiveMQDestination.QUEUE_TYPE),
new Byte(ActiveMQDestination.TOPIC_TYPE),
new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)
} );
}
public void testMessagesInSubscriptionPendingListExpire() throws Exception {
// Start a producer and consumer
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
destination = createDestinationInfo(connection, connectionInfo, destinationType);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setPrefetchSize(1);
connection.send(consumerInfo);
// m1 and m3 should not expire.. but the others should.
Message m1 = createMessage(producerInfo, destination, deliveryMode);
connection.send(m1);
connection.send(createMessage(producerInfo, destination, deliveryMode, 1000));
Message m3 = createMessage(producerInfo, destination, deliveryMode);
connection.send(m3);
connection.send(createMessage(producerInfo, destination, deliveryMode, 1000));
// Make sure only 1 message was delivered due to prefetch == 1
Message m = receiveMessage(connection);
assertNotNull(m);
assertEquals(m1.getMessageId(), m.getMessageId());
assertNoMessagesLeft(connection);
// Sleep before we ack so that the messages expire on the pending list..
Thread.sleep(1500);
connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
// 2nd message received should be m3.. it should have expired 2nd message sent.
m = receiveMessage(connection);
assertNotNull(m);
assertEquals(m3.getMessageId(), m.getMessageId());
connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
// And there should be no messages left now..
assertNoMessagesLeft(connection);
connection.send(closeConnectionInfo(connectionInfo));
}
public static Test suite() {
return suite(MessageExpirationTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}