git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1135649 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2011-06-14 15:31:54 +00:00
parent 6718b5b8d9
commit ed3f61a7b7
6 changed files with 111 additions and 20 deletions

View File

@ -32,8 +32,11 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
@ -93,6 +96,7 @@ public abstract class BaseDestination implements Destination {
private boolean gcWithNetworkConsumers; private boolean gcWithNetworkConsumers;
private long lastActiveTime=0l; private long lastActiveTime=0l;
private boolean reduceMemoryFootprint = false; private boolean reduceMemoryFootprint = false;
protected final Scheduler scheduler;
/** /**
* @param brokerService * @param brokerService
@ -113,6 +117,7 @@ public abstract class BaseDestination implements Destination {
this.memoryUsage = this.systemUsage.getMemoryUsage(); this.memoryUsage = this.systemUsage.getMemoryUsage();
this.memoryUsage.setUsagePortion(1.0f); this.memoryUsage.setUsagePortion(1.0f);
this.regionBroker = brokerService.getRegionBroker(); this.regionBroker = brokerService.getRegionBroker();
this.scheduler = brokerService.getBroker().getScheduler();
} }
/** /**
@ -707,4 +712,12 @@ public abstract class BaseDestination implements Destination {
} }
return hasRegularConsumers; return hasRegularConsumers;
} }
protected ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
answer.setBroker(this.broker);
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
return answer;
}
} }

View File

@ -110,7 +110,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
try { try {
this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName()); this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
} catch (IOException e) { } catch (IOException e) {
JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e); JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store "+ e);
jmsEx.setLinkedException(e); jmsEx.setLinkedException(e);
throw jmsEx; throw jmsEx;
} }
@ -229,6 +229,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
} }
} }
public void removePending(MessageReference node) throws IOException {
pending.remove(node);
}
protected void doAddRecoveredMessage(MessageReference message) throws Exception { protected void doAddRecoveredMessage(MessageReference message) throws Exception {
synchronized(pending) { synchronized(pending) {
pending.addRecoveredMessage(message); pending.addRecoveredMessage(message);

View File

@ -126,7 +126,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private final Object iteratingMutex = new Object() { private final Object iteratingMutex = new Object() {
}; };
private final Scheduler scheduler;
class TimeoutMessage implements Delayed { class TimeoutMessage implements Delayed {
@ -210,7 +210,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
super(brokerService, store, destination, parentStats); super(brokerService, store, destination, parentStats);
this.taskFactory = taskFactory; this.taskFactory = taskFactory;
this.dispatchSelector = new QueueDispatchSelector(destination); this.dispatchSelector = new QueueDispatchSelector(destination);
this.scheduler = brokerService.getBroker().getScheduler();
} }
public List<Subscription> getConsumers() { public List<Subscription> getConsumers() {
@ -1615,14 +1614,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
protected ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
answer.setBroker(this.broker);
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
return answer;
}
final void sendMessage(final Message msg) throws Exception { final void sendMessage(final Message msg) throws Exception {
messagesLock.writeLock().lock(); messagesLock.writeLock().lock();
try{ try{

View File

@ -510,6 +510,10 @@ public class Topic extends BaseDestination implements Task {
memoryUsage.start(); memoryUsage.start();
} }
if (getExpireMessagesPeriod() > 0) {
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
} }
public void stop() throws Exception { public void stop() throws Exception {
@ -523,14 +527,22 @@ public class Topic extends BaseDestination implements Task {
if (this.topicStore != null) { if (this.topicStore != null) {
this.topicStore.stop(); this.topicStore.stop();
} }
scheduler.cancel(expireMessagesTask);
} }
public Message[] browse() { public Message[] browse() {
final ConnectionContext connectionContext = createConnectionContext();
final Set<Message> result = new CopyOnWriteArraySet<Message>(); final Set<Message> result = new CopyOnWriteArraySet<Message>();
try { try {
if (topicStore != null) { if (topicStore != null) {
topicStore.recover(new MessageRecoveryListener() { topicStore.recover(new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception { public boolean recoverMessage(Message message) throws Exception {
if (message.isExpired()) {
for (Subscription sub : durableSubcribers.values()) {
messageExpired(connectionContext, sub, message);
}
}
result.add(message); result.add(message);
return true; return true;
} }
@ -640,6 +652,12 @@ public class Topic extends BaseDestination implements Task {
} }
} }
private final Runnable expireMessagesTask = new Runnable() {
public void run() {
browse();
}
};
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference, subs); broker.messageExpired(context, reference, subs);
// AMQ-2586: Better to leave this stat at zero than to give the user // AMQ-2586: Better to leave this stat at zero than to give the user
@ -652,8 +670,11 @@ public class Topic extends BaseDestination implements Task {
ack.setDestination(destination); ack.setDestination(destination);
ack.setMessageID(reference.getMessageId()); ack.setMessageID(reference.getMessageId());
try { try {
if (subs instanceof DurableTopicSubscription) {
((DurableTopicSubscription)subs).removePending(reference);
}
acknowledge(context, subs, ack, reference); acknowledge(context, subs, ack, reference);
} catch (IOException e) { } catch (Exception e) {
LOG.error("Failed to remove expired Message from the store ", e); LOG.error("Failed to remove expired Message from the store ", e);
} }
} }
@ -663,4 +684,5 @@ public class Topic extends BaseDestination implements Task {
return LOG; return LOG;
} }
} }

View File

@ -276,8 +276,8 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
@Override @Override
public synchronized void remove(MessageReference node) { public synchronized void remove(MessageReference node) {
if (currentCursor != null) { for (PendingMessageCursor tsp : storePrefetches) {
currentCursor.remove(node); tsp.remove(node);
} }
} }

View File

@ -20,12 +20,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection; import javax.jms.*;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName; import javax.management.ObjectName;
import junit.framework.Test; import junit.framework.Test;
@ -41,6 +36,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -401,6 +397,71 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
} }
public void testExpireMessagesForDurableSubscriber() throws Exception {
createBroker();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = factory.createConnection();
connection.setClientID("myConnection");
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
Topic destination = session.createTopic("test");
producer = session.createProducer(destination);
final int ttl = 300;
producer.setTimeToLive(ttl);
final long sendCount = 10;
TopicSubscriber sub = session.createDurableSubscriber(destination, "mySub");
sub.close();
for (int i=0; i < sendCount; i++) {
producer.send(session.createTextMessage("test"));
}
DestinationViewMBean view = createView((ActiveMQTopic)destination);
LOG.info("messages sent");
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(0, view.getExpiredCount());
assertEquals(10, view.getEnqueueCount());
Thread.sleep(4000);
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(10, view.getExpiredCount());
assertEquals(0, view.getEnqueueCount());
final AtomicLong received = new AtomicLong();
sub = session.createDurableSubscriber(destination, "mySub");
sub.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.incrementAndGet();
}
});
LOG.info("Waiting for messages to arrive");
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return received.get() >= sendCount;
}
}, 1000);
LOG.info("received=" + received.get());
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(0, received.get());
assertEquals(10, view.getExpiredCount());
assertEquals(0, view.getEnqueueCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
String domain = "org.apache.activemq"; String domain = "org.apache.activemq";