Refactor so that the ProducerBrokerExchange is passed all the way down to the Topic and Queue implementations.

This is laying the ground work to implement window based producer flow control.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@516444 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-03-09 15:59:14 +00:00
parent 24984276fc
commit 4569d7d28d
8 changed files with 29 additions and 22 deletions

View File

@ -303,7 +303,7 @@ abstract public class AbstractRegion implements Region {
producerExchange.setRegionDestination(regionDestination);
}
producerExchange.getRegionDestination().send(context, messageSend);
producerExchange.getRegionDestination().send(producerExchange, messageSend);
}
public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -36,7 +37,7 @@ public interface Destination extends Service {
void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
void removeSubscription(ConnectionContext context, Subscription sub) throws Exception;
void send(ConnectionContext context, Message messageSend) throws Exception;
void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
boolean lock(MessageReference node, LockOwner lockOwner);
void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -89,7 +90,7 @@ public class DestinationFilter implements Destination {
next.removeSubscription(context, sub);
}
public void send(ConnectionContext context, Message messageSend) throws Exception {
public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
next.send(context, messageSend);
}
@ -104,8 +105,8 @@ public class DestinationFilter implements Destination {
/**
* Sends a message to the given destination which may be a wildcard
*/
protected void send(ConnectionContext context, Message message, ActiveMQDestination destination) throws Exception {
Broker broker = context.getBroker();
protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
Broker broker = context.getConnectionContext().getBroker();
Set destinations = broker.getDestinations(destination);
for (Iterator iter = destinations.iterator(); iter.hasNext();) {

View File

@ -23,9 +23,12 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@ -316,7 +319,8 @@ public class Queue implements Destination, Task {
}
public void send(final ConnectionContext context,final Message message) throws Exception{
public void send(final ProducerBrokerExchange producerExchange,final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if(message.isExpired()){

View File

@ -236,8 +236,9 @@ public class Topic implements Destination {
public void send(final ConnectionContext context, final Message message) throws Exception {
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if( message.isExpired() ) {

View File

@ -17,16 +17,16 @@
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.ConnectionContext;
import java.util.Collection;
import java.util.Iterator;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Collection;
import java.util.Iterator;
/**
* Represents a composite {@link Destination} where send()s are replicated to
* each Destination instance.
@ -46,7 +46,7 @@ public class CompositeDestinationInterceptor extends DestinationFilter {
this.copyMessage = copyMessage;
}
public void send(ConnectionContext context, Message message) throws Exception {
public void send(ProducerBrokerExchange context, Message message) throws Exception {
MessageEvaluationContext messageContext = null;
for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {

View File

@ -16,18 +16,18 @@
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.ConnectionContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.DestinationMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Implements <a
* href="http://activemq.apache.org/virtual-destinations.html">Virtual
@ -77,7 +77,7 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
protected Destination createCompositeDestination(Destination destination, final List destinations) {
return new DestinationFilter(destination) {
public void send(ConnectionContext context, Message messageSend) throws Exception {
public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Destination destination = (Destination) iter.next();
destination.send(context, messageSend);

View File

@ -17,7 +17,7 @@
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
@ -41,7 +41,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
this.postfix = postfix;
}
public void send(ConnectionContext context, Message message) throws Exception {
public void send(ProducerBrokerExchange context, Message message) throws Exception {
ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
send(context, message, queueConsumers);
}