Robert Davies 2008-05-22 18:56:03 +00:00
parent 0446404c47
commit cdc2fd3d6d
25 changed files with 1182 additions and 377 deletions

View File

@ -16,12 +16,10 @@
*/
package org.apache.activemq.advisory;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
@ -38,10 +36,12 @@ import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
@ -72,7 +72,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
next.addConnection(context, info);
super.addConnection(context, info);
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
fireAdvisory(context, topic, info);
@ -80,7 +80,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription answer = next.addConsumer(context, info);
Subscription answer = super.addConsumer(context, info);
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
@ -133,7 +133,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.addProducer(context, info);
super.addProducer(context, info);
// Don't advise advisory topics.
if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
@ -144,7 +144,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
Destination answer = next.addDestination(context, destination);
Destination answer = super.addDestination(context, destination);
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
DestinationInfo previous = destinations.putIfAbsent(destination, info);
@ -170,7 +170,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
next.removeDestination(context, destination, timeout);
super.removeDestination(context, destination, timeout);
DestinationInfo info = destinations.remove(destination);
if (info != null) {
info.setDestination(destination);
@ -190,7 +190,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
next.removeDestinationInfo(context, destInfo);
super.removeDestinationInfo(context, destInfo);
DestinationInfo info = destinations.remove(destInfo.getDestination());
if (info != null) {
info.setDestination(destInfo.getDestination());
@ -211,7 +211,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
next.removeConnection(context, info, error);
super.removeConnection(context, info, error);
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
fireAdvisory(context, topic, info.createRemoveCommand());
@ -219,7 +219,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
next.removeConsumer(context, info);
super.removeConsumer(context, info);
// Don't advise advisory topics.
ActiveMQDestination dest = info.getDestination();
@ -233,7 +233,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.removeProducer(context, info);
super.removeProducer(context, info);
// Don't advise advisory topics.
ActiveMQDestination dest = info.getDestination();
@ -247,15 +247,93 @@ public class AdvisoryBroker extends BrokerFilter {
}
public void messageExpired(ConnectionContext context, MessageReference messageReference) {
next.messageExpired(context, messageReference);
super.messageExpired(context, messageReference);
try {
ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
fireAdvisory(context, topic, messageReference.getMessage());
if(!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
fireAdvisory(context, topic,payload);
}
} catch (Exception e) {
LOG.warn("Failed to fire message expired advisory");
}
}
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
super.messageConsumed(context, messageReference);
try {
if(!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
fireAdvisory(context, topic,payload);
}
} catch (Exception e) {
LOG.warn("Failed to fire message consumed advisory");
}
}
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
super.messageDelivered(context, messageReference);
try {
if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
fireAdvisory(context, topic,payload);
}
} catch (Exception e) {
LOG.warn("Failed to fire message delivered advisory");
}
}
public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
super.messageDiscarded(context, messageReference);
try {
if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
fireAdvisory(context, topic,payload);
}
} catch (Exception e) {
LOG.warn("Failed to fire message discarded advisory");
}
}
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
super.slowConsumer(context, destination,subs);
try {
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
fireAdvisory(context, topic,subs.getConsumerInfo());
} catch (Exception e) {
LOG.warn("Failed to fire message slow consumer advisory");
}
}
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
super.fastProducer(context, producerInfo);
try {
ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
fireAdvisory(context, topic,producerInfo);
} catch (Exception e) {
LOG.warn("Failed to fire message fast producer advisory");
}
}
public void isFull(ConnectionContext context,Destination destination,Usage usage) {
super.isFull(context,destination, usage);
try {
ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty("usageName", usage.getName());
fireAdvisory(context, topic,advisoryMessage);
} catch (Exception e) {
LOG.warn("Failed to fire message is full advisory");
}
}
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
fireAdvisory(context, topic, command, null);
}

View File

@ -39,6 +39,12 @@ public final class AdvisorySupport {
public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastConsumer.";
public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
public static final String AGENT_TOPIC = "ActiveMQ.Agent";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
@ -96,6 +102,48 @@ public final class AdvisorySupport {
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
String name = SLOW_CONSUMER_TOPIC_PREFIX
+ destination.getDestinationTypeAsString() + "."
+ destination.getPhysicalName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
String name = FAST_PRODUCER_TOPIC_PREFIX
+ destination.getDestinationTypeAsString() + "."
+ destination.getPhysicalName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
String name = MESSAGE_DISCAREDED_TOPIC_PREFIX
+ destination.getDestinationTypeAsString() + "."
+ destination.getPhysicalName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
String name = MESSAGE_DELIVERED_TOPIC_PREFIX
+ destination.getDestinationTypeAsString() + "."
+ destination.getPhysicalName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
String name = MESSAGE_CONSUMED_TOPIC_PREFIX
+ destination.getDestinationTypeAsString() + "."
+ destination.getPhysicalName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
String name = FULL_TOPIC_PREFIX
+ destination.getDestinationTypeAsString() + "."
+ destination.getPhysicalName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
@ -182,6 +230,90 @@ public final class AdvisorySupport {
}
}
public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++) {
if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
return true;
}
}
return false;
} else {
return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
}
}
public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++) {
if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
return true;
}
}
return false;
} else {
return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
}
}
public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++) {
if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
return true;
}
}
return false;
} else {
return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
}
}
public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++) {
if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
return true;
}
}
return false;
} else {
return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
}
}
public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++) {
if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
return true;
}
}
return false;
} else {
return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
}
}
public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++) {
if (isFullAdvisoryTopic(compositeDestinations[i])) {
return true;
}
}
return false;
} else {
return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
}
}
/**
* Returns the agent topic which is used to send commands to the broker
*/

View File

@ -19,8 +19,10 @@ package org.apache.activemq.broker;
import java.net.URI;
import java.util.Set;
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@ -31,6 +33,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.Usage;
/**
* The Message Broker which routes messages, maintains subscriptions and
@ -318,4 +321,50 @@ public interface Broker extends Region, Service {
*/
long getBrokerSequenceId();
/**
* called when message is consumed
* @param context
* @param messageReference
*/
void messageConsumed(ConnectionContext context, MessageReference messageReference);
/**
* Called when message is delivered to the broker
* @param context
* @param messageReference
*/
void messageDelivered(ConnectionContext context, MessageReference messageReference);
/**
* Called when a message is discarded - e.g. running low on memory
* This will happen only if the policy is enabled - e.g. non durable topics
* @param context
* @param messageReference
*/
void messageDiscarded(ConnectionContext context, MessageReference messageReference);
/**
* Called when there is a slow consumer
* @param context
* @param destination
* @param subs
*/
void slowConsumer(ConnectionContext context,Destination destination, Subscription subs);
/**
* Called to notify a producer is too fast
* @param context
* @param producerInfo
*/
void fastProducer(ConnectionContext context,ProducerInfo producerInfo);
/**
* Called when a Usage reaches a limit
* @param context
* @param destination
* @param usage
*/
void isFull(ConnectionContext context,Destination destination,Usage usage);
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.Usage;
/**
* Allows you to intercept broker operation so that features such as security
@ -264,4 +265,29 @@ public class BrokerFilter implements Broker {
public long getBrokerSequenceId() {
return next.getBrokerSequenceId();
}
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
next.fastProducer(context, producerInfo);
}
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
next.isFull(context,destination, usage);
}
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
next.messageConsumed(context, messageReference);
}
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
next.messageDelivered(context, messageReference);
}
public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
next.messageDiscarded(context, messageReference);
}
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
next.slowConsumer(context, destination,subs);
}
}

View File

@ -40,6 +40,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.Usage;
/**
* Dumb implementation - used to be overriden by listeners
@ -256,4 +257,22 @@ public class EmptyBroker implements Broker {
public long getBrokerSequenceId() {
return -1l;
}
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
}
public void isFull(ConnectionContext context, Destination destination,Usage usage) {
}
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
}
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
}
public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
}
public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) {
}
}

View File

@ -40,6 +40,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.Usage;
/**
* Implementation of the broker where all it's methods throw an
@ -267,4 +268,28 @@ public class ErrorBroker implements Broker {
public long getBrokerSequenceId() {
throw new BrokerStoppedException(this.message);
}
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
throw new BrokerStoppedException(this.message);
}
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
throw new BrokerStoppedException(this.message);
}
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}
public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
throw new BrokerStoppedException(this.message);
}
}

View File

@ -41,6 +41,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.Usage;
/**
* Like a BrokerFilter but it allows you to switch the getNext().broker. This
@ -278,4 +279,28 @@ public class MutableBrokerFilter implements Broker {
return getNext().getBrokerSequenceId();
}
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
getNext().fastProducer(context, producerInfo);
}
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
getNext().isFull(context,destination, usage);
}
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
getNext().messageConsumed(context, messageReference);
}
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
getNext().messageDelivered(context, messageReference);
}
public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
getNext().messageDiscarded(context, messageReference);
}
public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
getNext().slowConsumer(context, dest,subs);
}
}

View File

@ -191,6 +191,23 @@ public abstract class AbstractSubscription implements Subscription {
return Integer.MAX_VALUE;
}
/**
* Add a destination
* @param destination
*/
public void addDestination(Destination destination) {
}
/**
* Remove a destination
* @param destination
*/
public void removeDestination(Destination destination) {
}
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
add(message);
}

View File

@ -24,12 +24,17 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
/**
* @version $Revision: 1.12 $
*/
public abstract class BaseDestination implements Destination {
/**
* The default number of messages to page in to the destination
* from persistent storage
*/
public static final int DEFAULT_PAGE_SIZE=100;
protected final ActiveMQDestination destination;
protected final Broker broker;
@ -44,6 +49,12 @@ public abstract class BaseDestination implements Destination {
private boolean useCache=true;
private int minimumMessageSize=1024;
private boolean lazyDispatch=false;
private boolean advisoryForSlowConsumers;
private boolean advisdoryForFastProducers;
private boolean advisoryForDiscardingMessages;
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService;
protected final Broker regionBroker;
@ -200,4 +211,157 @@ public abstract class BaseDestination implements Destination {
protected long getDestinationSequenceId() {
return regionBroker.getBrokerSequenceId();
}
/**
* @return the advisoryForSlowConsumers
*/
public boolean isAdvisoryForSlowConsumers() {
return advisoryForSlowConsumers;
}
/**
* @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
*/
public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
this.advisoryForSlowConsumers = advisoryForSlowConsumers;
}
/**
* @return the advisoryForDiscardingMessages
*/
public boolean isAdvisoryForDiscardingMessages() {
return advisoryForDiscardingMessages;
}
/**
* @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
*/
public void setAdvisoryForDiscardingMessages(
boolean advisoryForDiscardingMessages) {
this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
}
/**
* @return the advisoryWhenFull
*/
public boolean isAdvisoryWhenFull() {
return advisoryWhenFull;
}
/**
* @param advisoryWhenFull the advisoryWhenFull to set
*/
public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
this.advisoryWhenFull = advisoryWhenFull;
}
/**
* @return the advisoryForDelivery
*/
public boolean isAdvisoryForDelivery() {
return advisoryForDelivery;
}
/**
* @param advisoryForDelivery the advisoryForDelivery to set
*/
public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
this.advisoryForDelivery = advisoryForDelivery;
}
/**
* @return the advisoryForConsumed
*/
public boolean isAdvisoryForConsumed() {
return advisoryForConsumed;
}
/**
* @param advisoryForConsumed the advisoryForConsumed to set
*/
public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
this.advisoryForConsumed = advisoryForConsumed;
}
/**
* @return the advisdoryForFastProducers
*/
public boolean isAdvisdoryForFastProducers() {
return advisdoryForFastProducers;
}
/**
* @param advisdoryForFastProducers the advisdoryForFastProducers to set
*/
public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
/**
* called when message is consumed
* @param context
* @param messageReference
*/
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
if (advisoryForConsumed) {
broker.messageConsumed(context, messageReference);
}
}
/**
* Called when message is delivered to the broker
* @param context
* @param messageReference
*/
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
if(advisoryForDelivery) {
broker.messageDelivered(context, messageReference);
}
}
/**
* Called when a message is discarded - e.g. running low on memory
* This will happen only if the policy is enabled - e.g. non durable topics
* @param context
* @param messageReference
*/
public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
if (advisoryForDiscardingMessages) {
broker.messageDiscarded(context, messageReference);
}
}
/**
* Called when there is a slow consumer
* @param context
* @param subs
*/
public void slowConsumer(ConnectionContext context, Subscription subs) {
if(advisoryForSlowConsumers) {
broker.slowConsumer(context, this, subs);
}
}
/**
* Called to notify a producer is too fast
* @param context
* @param producerInfo
*/
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
if(advisdoryForFastProducers) {
broker.fastProducer(context, producerInfo);
}
}
/**
* Called when a Usage reaches a limit
* @param context
* @param usage
*/
public void isFull(ConnectionContext context,Usage usage) {
if(advisoryWhenFull) {
broker.isFull(context,this, usage);
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.Usage;
/**
* @version $Revision: 1.12 $
@ -114,4 +115,48 @@ public interface Destination extends Service, Task {
public void setLazyDispatch(boolean value);
void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node);
/**
* called when message is consumed
* @param context
* @param messageReference
*/
void messageConsumed(ConnectionContext context, MessageReference messageReference);
/**
* Called when message is delivered to the broker
* @param context
* @param messageReference
*/
void messageDelivered(ConnectionContext context, MessageReference messageReference);
/**
* Called when a message is discarded - e.g. running low on memory
* This will happen only if the policy is enabled - e.g. non durable topics
* @param context
* @param messageReference
*/
void messageDiscarded(ConnectionContext context, MessageReference messageReference);
/**
* Called when there is a slow consumer
* @param context
* @param subs
*/
void slowConsumer(ConnectionContext context, Subscription subs);
/**
* Called to notify a producer is too fast
* @param context
* @param producerInfo
*/
void fastProducer(ConnectionContext context,ProducerInfo producerInfo);
/**
* Called when a Usage reaches a limit
* @param context
* @param usage
*/
void isFull(ConnectionContext context,Usage usage);
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.Usage;
/**
*
@ -208,4 +209,33 @@ public class DestinationFilter implements Destination {
public boolean iterate() {
return next.iterate();
}
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
next.fastProducer(context, producerInfo);
}
public void isFull(ConnectionContext context, Usage usage) {
next.isFull(context, usage);
}
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
next.messageConsumed(context, messageReference);
}
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
next.messageDelivered(context, messageReference);
}
public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
next.messageDiscarded(context, messageReference);
}
public void slowConsumer(ConnectionContext context, Subscription subs) {
next.slowConsumer(context, subs);
}
}

View File

@ -157,4 +157,8 @@ public class IndirectMessageReference implements QueueMessageReference {
public synchronized int getSize() {
return message.getSize();
}
public boolean isAdvisory() {
return message.isAdvisory();
}
}

View File

@ -61,4 +61,9 @@ public interface MessageReference {
*/
boolean isDropped();
/**
* @return true if the message is an advisory
*/
boolean isAdvisory();
}

View File

@ -123,4 +123,8 @@ final class NullMessageReference implements QueueMessageReference {
throw new RuntimeException("not implemented");
}
public boolean isAdvisory() {
return false;
}
}

View File

@ -65,6 +65,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
private final Object pendingLock = new Object();
private final Object dispatchLock = new Object();
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
private boolean slowConsumer;
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker,context, info);
@ -499,6 +500,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
try {
int numberToDispatch = countBeforeFull();
if (numberToDispatch > 0) {
slowConsumer=false;
pending.setMaxBatchSize(numberToDispatch);
int count = 0;
pending.reset();
@ -525,6 +527,16 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
count++;
}
}
}else {
if (!slowConsumer) {
slowConsumer=true;
ConnectionContext c = new ConnectionContext();
c.setBroker(context.getBroker());
for (Destination dest :destinations) {
dest.slowConsumer(c,this);
}
}
}
} finally {
pending.release();

View File

@ -337,79 +337,83 @@ public class Queue extends BaseDestination implements Task {
}
return;
}
if (isProducerFlowControl() && context.isProducerFlowControl() && memoryUsage.isFull()) {
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached");
}
if(memoryUsage.isFull()) {
isFull(context, memoryUsage);
fastProducer(context, producerInfo);
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached");
}
// We can avoid blocking due to low usage if the producer is sending
// a sync message or
// if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
// We can avoid blocking due to low usage if the producer is sending
// a sync message or
// if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
try {
try {
// While waiting for space to free up... the
// message may have expired.
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
} else {
doMessageSend(producerExchange, message);
}
// While waiting for space to free up... the
// message may have expired.
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
} else {
doMessageSend(producerExchange, message);
}
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
} else {
Response response = new Response();
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
} else {
Response response = new Response();
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
} catch (Exception e) {
if (!sendProducerAck && !context.isInRecoveryMode()) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
} catch (Exception e) {
if (!sendProducerAck && !context.isInRecoveryMode()) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
}
}
});
// If the user manager is not full, then the task will not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
});
// If the user manager is not full, then the task will not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
context.setDontSendReponse(true);
return;
}
context.setDontSendReponse(true);
return;
}
} else {
} else {
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
}
}
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Expired message: " + message);
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Expired message: " + message);
}
broker.getRoot().messageExpired(context, message);
return;
}
broker.getRoot().messageExpired(context, message);
return;
}
}
}
@ -485,6 +489,7 @@ public class Queue extends BaseDestination implements Task {
}
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
messageConsumed(context, node);
if (store != null && node.isPersistent()) {
// the original ack may be a ranged ack, but we are trying to delete
// a specific
@ -1062,6 +1067,7 @@ public class Queue extends BaseDestination implements Task {
}
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
messageDelivered(context, msg);
wakeup();
}

View File

@ -58,6 +58,7 @@ import org.apache.activemq.kaha.Store;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
@ -679,6 +680,24 @@ public class RegionBroker implements Broker {
getRoot().sendToDeadLetterQueue(context, node);
}
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
}
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
}
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
}
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
}
public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
}
public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
}
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference node){
try{

View File

@ -57,6 +57,7 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tools.ant.taskdefs.condition.IsFalse;
/**
* The Topic is a destination that sends a copy of a message to every active
@ -277,90 +278,95 @@ public class Topic extends BaseDestination implements Task{
return;
}
if (isProducerFlowControl() && context.isProducerFlowControl() && memoryUsage.isFull()) {
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}
if(memoryUsage.isFull()) {
isFull(context, memoryUsage);
fastProducer(context, producerInfo);
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}
// We can avoid blocking due to low usage if the producer is sending
// a sync message or
// if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
// We can avoid blocking due to low usage if the producer is sending
// a sync message or
// if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
try {
try {
// While waiting for space to free up... the
// message may have expired.
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
//destinationStatistics.getEnqueues().increment();
//destinationStatistics.getMessages().decrement();
} else {
doMessageSend(producerExchange, message);
// While waiting for space to free up... the
// message may have expired.
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
//destinationStatistics.getEnqueues().increment();
//destinationStatistics.getMessages().decrement();
} else {
doMessageSend(producerExchange, message);
}
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
} else {
Response response = new Response();
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
} catch (Exception e) {
if (!sendProducerAck && !context.isInRecoveryMode()) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
}
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
} else {
Response response = new Response();
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
} catch (Exception e) {
if (!sendProducerAck && !context.isInRecoveryMode()) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
}
});
// If the user manager is not full, then the task will not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
});
// If the user manager is not full, then the task will not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
context.setDontSendReponse(true);
return;
}
context.setDontSendReponse(true);
return;
}
} else {
} else {
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
int count = 0;
while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
int count = 0;
while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
if (count > 2 && context.isInTransaction()) {
count =0;
int size = context.getTransaction().size();
LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
}
}
if (count > 2 && context.isInTransaction()) {
count =0;
int size = context.getTransaction().size();
LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
}
}
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Expired message: " + message);
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Expired message: " + message);
}
return;
}
return;
}
}
}
doMessageSend(producerExchange, message);
messageDelivered(context, message);
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@ -445,6 +451,7 @@ public class Topic extends BaseDestination implements Task{
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
}
messageConsumed(context, node);
}
public void dispose(ConnectionContext context) throws IOException {

View File

@ -61,6 +61,7 @@ public class TopicSubscription extends AbstractSubscription {
private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0);
private int memoryUsageHighWaterMark = 95;
private boolean slowConsumer;
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
@ -87,7 +88,15 @@ public class TopicSubscription extends AbstractSubscription {
// have not been dispatched (i.e. we allow the prefetch buffer to be
// filled)
dispatch(node);
slowConsumer=false;
} else {
//we are slow
if(!slowConsumer) {
slowConsumer=true;
for (Destination dest: destinations) {
dest.slowConsumer(getContext(), this);
}
}
if (maximumPendingMessages != 0) {
synchronized (matchedListMutex) {
matched.addMessageLast(node);
@ -432,6 +441,10 @@ public class TopicSubscription extends AbstractSubscription {
if (LOG.isDebugEnabled()) {
LOG.debug("Discarding message " + message);
}
Destination dest = message.getRegionDestination();
if (dest != null) {
dest.messageDiscarded(getContext(), message);
}
broker.getRoot().sendToDeadLetterQueue(getContext(), message);
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
@ -62,8 +63,15 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean useConsumerPriority=true;
private boolean strictOrderDispatch=false;
private boolean lazyDispatch=false;
private boolean advisoryForSlowConsumers;
private boolean advisdoryForFastProducers;
private boolean advisoryForDiscardingMessages;
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
public void configure(Broker broker,Queue queue) {
baseConfiguration(queue);
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
}
@ -78,20 +86,16 @@ public class PolicyEntry extends DestinationMapEntry {
PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
queue.setMessages(messages);
}
queue.setProducerFlowControl(isProducerFlowControl());
queue.setEnableAudit(isEnableAudit());
queue.setMaxAuditDepth(getMaxQueueAuditDepth());
queue.setMaxProducersToAudit(getMaxProducersToAudit());
queue.setMaxPageSize(getMaxPageSize());
queue.setUseCache(isUseCache());
queue.setMinimumMessageSize((int) getMinimumMessageSize());
queue.setUseConsumerPriority(isUseConsumerPriority());
queue.setStrictOrderDispatch(isStrictOrderDispatch());
queue.setOptimizedDispatch(isOptimizedDispatch());
queue.setLazyDispatch(isLazyDispatch());
}
public void configure(Topic topic) {
baseConfiguration(topic);
if (dispatchPolicy != null) {
topic.setDispatchPolicy(dispatchPolicy);
}
@ -105,16 +109,25 @@ public class PolicyEntry extends DestinationMapEntry {
if (memoryLimit > 0) {
topic.getMemoryUsage().setLimit(memoryLimit);
}
topic.setProducerFlowControl(isProducerFlowControl());
topic.setEnableAudit(isEnableAudit());
topic.setMaxAuditDepth(getMaxAuditDepth());
topic.setMaxProducersToAudit(getMaxProducersToAudit());
topic.setMaxPageSize(getMaxPageSize());
topic.setUseCache(isUseCache());
topic.setMinimumMessageSize((int) getMinimumMessageSize());
topic.setLazyDispatch(isLazyDispatch());
}
public void baseConfiguration(BaseDestination destination) {
destination.setProducerFlowControl(isProducerFlowControl());
destination.setEnableAudit(isEnableAudit());
destination.setMaxAuditDepth(getMaxQueueAuditDepth());
destination.setMaxProducersToAudit(getMaxProducersToAudit());
destination.setMaxPageSize(getMaxPageSize());
destination.setUseCache(isUseCache());
destination.setMinimumMessageSize((int) getMinimumMessageSize());
destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
@ -415,4 +428,89 @@ public class PolicyEntry extends DestinationMapEntry {
this.lazyDispatch = lazyDispatch;
}
/**
* @return the advisoryForSlowConsumers
*/
public boolean isAdvisoryForSlowConsumers() {
return advisoryForSlowConsumers;
}
/**
* @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
*/
public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
this.advisoryForSlowConsumers = advisoryForSlowConsumers;
}
/**
* @return the advisoryForDiscardingMessages
*/
public boolean isAdvisoryForDiscardingMessages() {
return advisoryForDiscardingMessages;
}
/**
* @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
*/
public void setAdvisoryForDiscardingMessages(
boolean advisoryForDiscardingMessages) {
this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
}
/**
* @return the advisoryWhenFull
*/
public boolean isAdvisoryWhenFull() {
return advisoryWhenFull;
}
/**
* @param advisoryWhenFull the advisoryWhenFull to set
*/
public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
this.advisoryWhenFull = advisoryWhenFull;
}
/**
* @return the advisoryForDelivery
*/
public boolean isAdvisoryForDelivery() {
return advisoryForDelivery;
}
/**
* @param advisoryForDelivery the advisoryForDelivery to set
*/
public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
this.advisoryForDelivery = advisoryForDelivery;
}
/**
* @return the advisoryForConsumed
*/
public boolean isAdvisoryForConsumed() {
return advisoryForConsumed;
}
/**
* @param advisoryForConsumed the advisoryForConsumed to set
*/
public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
this.advisoryForConsumed = advisoryForConsumed;
}
/**
* @return the advisdoryForFastProducers
*/
public boolean isAdvisdoryForFastProducers() {
return advisdoryForFastProducers;
}
/**
* @param advisdoryForFastProducers the advisdoryForFastProducers to set
*/
public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
@ -92,6 +93,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
private BrokerId[] cluster;
public abstract Message copy();
public abstract void clearBody() throws JMSException;
protected void copy(Message copy) {
super.copy(copy);

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.advisory;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
/**
* @version $Revision: 1.3 $
*/
public class AdvisoryTests extends TestCase {
protected static final int MESSAGE_COUNT = 2000;
protected BrokerService broker;
protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected int topicCount;
public void testNoSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
}
});
Topic advisoryTopic = AdvisorySupport
.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNull(msg);
}
public void testSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport
.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
public void testMessageDeliveryAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
public void testMessageConsumedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
Message msg = consumer.receive(1000);
assertNotNull(msg);
msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
public void testMessageExpiredAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
producer.setTimeToLive(1);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(2000);
assertNotNull(msg);
}
public void xtestMessageDiscardedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName());
MessageConsumer consumer = s.createConsumer(topic);
Topic advisoryTopic = AdvisorySupport.getMessageDiscardedAdvisoryTopic((ActiveMQDestination) topic);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(topic);
int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2);
for (int i = 0; i < count; i++) {
BytesMessage m = s.createBytesMessage();
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
}
ConnectionFactory factory = createConnectionFactory();
connection = factory.createConnection();
connection.start();
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
connection.close();
if (broker != null) {
broker.stop();
}
}
protected ActiveMQConnectionFactory createConnectionFactory()
throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
return cf;
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
configureBroker(answer);
answer.start();
return answer;
}
protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false);
PolicyEntry policy = new PolicyEntry();
policy.setAdvisdoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);
policy.setAdvisoryForSlowConsumers(true);
policy.setAdvisoryWhenFull(true);
policy.setProducerFlowControl(false);
ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
strategy.setLimit(10);
policy.setPendingMessageLimitStrategy(strategy);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap);
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
}

View File

@ -17,34 +17,10 @@
package org.apache.activemq.broker;
import java.net.URI;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
public class StubBroker implements Broker {
public class StubBroker extends EmptyBroker {
public LinkedList<AddConnectionData> addConnectionData = new LinkedList<AddConnectionData>();
public LinkedList<RemoveConnectionData> removeConnectionData = new LinkedList<RemoveConnectionData>();
@ -78,188 +54,4 @@ public class StubBroker implements Broker {
removeConnectionData.add(new RemoveConnectionData(context, info, error));
}
// --- Blank Methods, fill in as needed ---
public void addBroker(Connection connection, BrokerInfo info) {
}
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
}
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
}
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
}
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
}
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
}
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
}
public Broker getAdaptor(Class type) {
return null;
}
public ConnectionContext getAdminConnectionContext() {
return null;
}
public BrokerId getBrokerId() {
return null;
}
public String getBrokerName() {
return null;
}
public Connection[] getClients() throws Exception {
return null;
}
public ActiveMQDestination[] getDestinations() throws Exception {
return null;
}
public Set<ActiveMQDestination> getDurableDestinations() {
return null;
}
public BrokerInfo[] getPeerBrokerInfos() {
return null;
}
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
return null;
}
public boolean isFaultTolerantConfiguration() {
return false;
}
public boolean isSlaveBroker() {
return false;
}
public boolean isStopped() {
return false;
}
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
return 0;
}
public void preProcessDispatch(MessageDispatch messageDispatch) {
}
public void postProcessDispatch(MessageDispatch messageDispatch) {
}
public void removeBroker(Connection connection, BrokerInfo info) {
}
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
}
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
}
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
}
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
}
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
}
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
return null;
}
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
return null;
}
public void gc() {
}
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return null;
}
public Set getDestinations(ActiveMQDestination destination) {
return null;
}
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return null;
}
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
}
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
}
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
}
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
return null;
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
}
public Store getTempDataStore() {
return null;
}
public URI getVmConnectorURI() {
return null;
}
public void brokerServiceStarted() {
}
public BrokerService getBrokerService() {
return null;
}
public boolean isExpired(MessageReference messageReference) {
return false;
}
public void messageExpired(ConnectionContext context, MessageReference messageReference) {
}
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) {
}
public Broker getRoot() {
return this;
}
public long getBrokerSequenceId() {
return -1l;
}
}

View File

@ -307,5 +307,11 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return false;
}
public void addDestination(Destination destination) {
}
public void removeDestination(Destination destination) {
}
}
}

View File

@ -402,6 +402,10 @@ public class ActiveMQMessageTest extends TestCase {
public Response visit(CommandVisitor visitor) throws Exception {
return null;
}
@Override
public void clearBody() throws JMSException {
}
};
msg.setProperty("stringProperty", "string");