From 84eb9f8b6975ccdf1fe7aa7c666fcc59ceb91ac7 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Sat, 3 Mar 2007 11:30:22 +0000 Subject: [PATCH] performance tuning git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@514131 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/ActiveMQSession.java | 3 + .../activemq/advisory/AdvisoryBroker.java | 7 +- .../activemq/broker/BrokerBroadcaster.java | 12 +- .../apache/activemq/broker/BrokerFilter.java | 8 +- .../broker/CompositeDestinationBroker.java | 6 +- .../broker/ConsumerBrokerExchange.java | 88 +++++++++++++++ .../apache/activemq/broker/EmptyBroker.java | 4 +- .../apache/activemq/broker/ErrorBroker.java | 4 +- .../activemq/broker/MutableBrokerFilter.java | 8 +- .../broker/ProducerBrokerExchange.java | 103 ++++++++++++++++++ .../activemq/broker/TransactionBroker.java | 20 ++-- .../activemq/broker/TransportConnection.java | 85 +++++++++++---- .../apache/activemq/broker/UserIDBroker.java | 5 +- .../activemq/broker/ft/MasterBroker.java | 10 +- .../broker/region/AbstractRegion.java | 29 +++-- .../apache/activemq/broker/region/Region.java | 11 +- .../activemq/broker/region/RegionBroker.java | 80 ++++++++------ .../apache/activemq/broker/region/Topic.java | 9 +- .../broker/util/LoggingBrokerPlugin.java | 10 +- .../broker/util/TimeStampingBrokerPlugin.java | 5 +- .../broker/util/UDPTraceBrokerPlugin.java | 10 +- .../view/ConnectionDotFileInterceptor.java | 5 +- .../security/AuthorizationBroker.java | 7 +- .../activemq/transport/vm/VMTransport.java | 7 -- .../apache/activemq/util/BrokerSupport.java | 6 +- .../apache/activemq/broker/StubBroker.java | 4 +- 26 files changed, 416 insertions(+), 130 deletions(-) create mode 100755 activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java create mode 100755 activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 008dc29267..d9e2a8bb7b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1585,6 +1585,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta message.setJMSMessageID(msg.getMessageId().toString()); } msg.setTransactionId(txid); + if(connection.isCopyMessageOnSend()){ + msg=(ActiveMQMessage)msg.copy(); + } msg.setConnection(connection); msg.onSend(); msg.setProducerId(msg.getMessageId().getProducerId()); diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index a248779265..dbbd2bd857 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -22,6 +22,7 @@ import java.util.Iterator; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; @@ -265,11 +266,13 @@ public class AdvisoryBroker extends BrokerFilter { advisoryMessage.setDestination(topic); advisoryMessage.setResponseRequired(false); advisoryMessage.setProducerId(advisoryProducerId); - boolean originalFlowControl = context.isProducerFlowControl(); + final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + producerExchange.setConnectionContext(context); + producerExchange.setMutable(true); try { context.setProducerFlowControl(false); - next.send(context, advisoryMessage); + next.send(producerExchange, advisoryMessage); } finally { context.setProducerFlowControl(originalFlowControl); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java index fdf22cceb0..3b8075952b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java @@ -43,11 +43,11 @@ public class BrokerBroadcaster extends BrokerFilter{ super(next); } - public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{ - next.acknowledge(context,ack); + public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{ + next.acknowledge(consumerExchange,ack); Broker brokers[]=getListeners(); for(int i=0;iproducerExchanges = new HashMap(); + private final MapconsumerExchanges = new HashMap(); static class ConnectionState extends org.apache.activemq.state.ConnectionState{ @@ -427,33 +430,24 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit public Response processMessage(Message messageSend) throws Exception{ ProducerId producerId=messageSend.getProducerId(); - ConnectionState state=lookupConnectionState(producerId); - ConnectionContext context=state.getContext(); - // If the message originates from this client connection, - // then, finde the associated producer state so we can do some dup detection. - ProducerState producerState=null; - if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){ - SessionState ss=state.getSessionState(producerId.getParentId()); - if(ss==null) - throw new IllegalStateException("Cannot send from a session that had not been registered: " - +producerId.getParentId()); - producerState=ss.getProducerState(producerId); - } - if(producerState==null){ - broker.send(context,messageSend); - }else{ - // Avoid Dups. + ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId); + ProducerState producerState=producerExchange.getProducerState(); + if(producerState!=null){ long seq=messageSend.getMessageId().getProducerSequenceId(); if(seq>producerState.getLastSequenceId()){ producerState.setLastSequenceId(seq); - broker.send(context,messageSend); + broker.send(producerExchange,messageSend); } + }else{ + // producer not local to this broker + broker.send(producerExchange,messageSend); } return null; } public Response processMessageAck(MessageAck ack) throws Exception{ - broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(),ack); + ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); + broker.acknowledge(consumerExchange,ack); return null; } @@ -515,6 +509,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit ProducerState ps=ss.removeProducer(id); if(ps==null) throw new IllegalStateException("Cannot remove a producer that had not been registered: "+id); + removeProducerBrokerExchange(id); broker.removeProducer(cs.getContext(),ps.getInfo()); return null; } @@ -551,6 +546,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(consumerState==null) throw new IllegalStateException("Cannot remove a consumer that had not been registered: "+id); broker.removeConsumer(cs.getContext(),consumerState.getInfo()); + removeConsumerBrokerExchange(id); return null; } @@ -981,4 +977,53 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit public String getRemoteAddress(){ return transport.getRemoteAddress(); } + + private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){ + ProducerBrokerExchange result=producerExchanges.get(id); + if(result==null){ + synchronized(producerExchanges){ + result=new ProducerBrokerExchange(); + ConnectionState state=lookupConnectionState(id); + ConnectionContext context=state.getContext(); + result.setConnectionContext(context); + SessionState ss=state.getSessionState(id.getParentId()); + if(ss!=null){ + result.setProducerState(ss.getProducerState(id)); + ProducerState producerState=ss.getProducerState(id); + if(producerState!=null&&producerState.getInfo()!=null){ + ProducerInfo info=producerState.getInfo(); + result.setMutable(info.getDestination()==null); + } + } + producerExchanges.put(id,result); + } + } + return result; + } + + private void removeProducerBrokerExchange(ProducerId id) { + synchronized(producerExchanges) { + producerExchanges.remove(id); + } + } + + private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { + ConsumerBrokerExchange result = consumerExchanges.get(id); + if (result == null) { + synchronized(consumerExchanges) { + result = new ConsumerBrokerExchange(); + ConnectionState state = lookupConnectionState(id); + ConnectionContext context = state.getContext(); + result.setConnectionContext(context); + consumerExchanges.put(id,result); + } + } + return result; + } + + private void removeConsumerBrokerExchange(ConsumerId id) { + synchronized(consumerExchanges) { + consumerExchanges.remove(id); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java index ddf815b735..98177219ac 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java @@ -33,9 +33,10 @@ public class UserIDBroker extends BrokerFilter { super(next); } - public void send(ConnectionContext context, Message messageSend) throws Exception { + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + final ConnectionContext context = producerExchange.getConnectionContext(); String userID = context.getUserName(); messageSend.setUserID(userID); - super.send(context, messageSend); + super.send(producerExchange, messageSend); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java index 129f0575d2..6be3a7c415 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java @@ -17,8 +17,10 @@ package org.apache.activemq.broker.ft; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.InsertableMutableBrokerFilter; import org.apache.activemq.broker.MutableBrokerFilter; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; @@ -298,13 +300,13 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ * @throws Exception * */ - public void send(ConnectionContext context,Message message) throws Exception{ + public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception{ /** * A message can be dispatched before the super.send() method returns so - here the order is switched to avoid * problems on the slave with receiving acks for messages not received yey */ sendToSlave(message); - super.send(context,message); + super.send(producerExchange,message); } /** @@ -313,9 +315,9 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ * @throws Exception * */ - public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{ + public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{ sendToSlave(ack); - super.acknowledge(context,ack); + super.acknowledge(consumerExchange,ack); } public boolean isFaultTolerantConfiguration(){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 8ff56a2de2..8241a3348a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -25,7 +25,9 @@ import java.util.Set; import javax.jms.JMSException; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.DestinationAlreadyExistsException; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; @@ -300,17 +302,28 @@ abstract public class AbstractRegion implements Region { throw new JMSException("Invalid operation."); } - public void send(ConnectionContext context, Message messageSend) + public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { - Destination dest = lookup(context, messageSend.getDestination()); - dest.send(context, messageSend); + final ConnectionContext context = producerExchange.getConnectionContext(); + + if (producerExchange.isMutable() || producerExchange.getRegionDestination()==null) { + final Destination regionDestination = lookup(context,messageSend.getDestination()); + producerExchange.setRegionDestination(regionDestination); + } + + producerExchange.getRegionDestination().send(context, messageSend); } - public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { - Subscription sub = (Subscription) subscriptions.get(ack.getConsumerId()); - if( sub==null ) - throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId()); - sub.acknowledge(context, ack); + public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{ + Subscription sub=consumerExchange.getSubscription(); + if(sub==null){ + sub=(Subscription)subscriptions.get(ack.getConsumerId()); + if(sub==null){ + throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId()); + } + consumerExchange.setSubscription(sub); + } + sub.acknowledge(consumerExchange.getConnectionContext(),ack); } public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java index 4e4a9365a2..1e7e76c795 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java @@ -19,6 +19,8 @@ package org.apache.activemq.broker.region; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; @@ -98,17 +100,18 @@ public interface Region extends Service { * Send a message to the broker to using the specified destination. The destination specified * in the message does not need to match the destination the message is sent to. This is * handy in case the message is being sent to a dead letter destination. - * @param context the environment the operation is being executed under. + * @param producerExchange the environment the operation is being executed under. + * @param message * @throws Exception TODO */ - public void send(ConnectionContext context, Message message) throws Exception; + public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception; /** * Used to acknowledge the receipt of a message by a client. - * @param context the environment the operation is being executed under. + * @param consumerExchange the environment the operation is being executed under. * @throws Exception TODO */ - public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception; + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception; /** * Allows a consumer to pull a message from a queue diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 32b3b1e2d9..c50cec7285 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -31,7 +31,9 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.DestinationAlreadyExistsException; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy; @@ -366,46 +368,56 @@ public class RegionBroker implements Broker { topicRegion.removeSubscription(context, info); } - public void send(ConnectionContext context, Message message) throws Exception { + public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { long si = sequenceGenerator.getNextSequenceId(); message.getMessageId().setBrokerSequenceId(si); - ActiveMQDestination destination = message.getDestination(); - switch(destination.getDestinationType()) { - case ActiveMQDestination.QUEUE_TYPE: - queueRegion.send(context, message); - break; - case ActiveMQDestination.TOPIC_TYPE: - topicRegion.send(context, message); - break; - case ActiveMQDestination.TEMP_QUEUE_TYPE: - tempQueueRegion.send(context, message); - break; - case ActiveMQDestination.TEMP_TOPIC_TYPE: - tempTopicRegion.send(context, message); - break; - default: - throw createUnknownDestinationTypeException(destination); + if (producerExchange.isMutable() || producerExchange.getRegion()==null) { + ActiveMQDestination destination = message.getDestination(); + Region region = null; + switch(destination.getDestinationType()) { + case ActiveMQDestination.QUEUE_TYPE: + region = queueRegion; + break; + case ActiveMQDestination.TOPIC_TYPE: + region = topicRegion; + break; + case ActiveMQDestination.TEMP_QUEUE_TYPE: + region = tempQueueRegion; + break; + case ActiveMQDestination.TEMP_TOPIC_TYPE: + region = tempTopicRegion; + break; + default: + throw createUnknownDestinationTypeException(destination); + } + producerExchange.setRegion(region); } + producerExchange.getRegion().send(producerExchange,message); } - public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { - ActiveMQDestination destination = ack.getDestination(); - switch(destination.getDestinationType()) { - case ActiveMQDestination.QUEUE_TYPE: - queueRegion.acknowledge(context, ack); - break; - case ActiveMQDestination.TOPIC_TYPE: - topicRegion.acknowledge(context, ack); - break; - case ActiveMQDestination.TEMP_QUEUE_TYPE: - tempQueueRegion.acknowledge(context, ack); - break; - case ActiveMQDestination.TEMP_TOPIC_TYPE: - tempTopicRegion.acknowledge(context, ack); - break; - default: - throw createUnknownDestinationTypeException(destination); + public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{ + if(consumerExchange.getRegion()==null){ + ActiveMQDestination destination=ack.getDestination(); + Region region=null; + switch(destination.getDestinationType()){ + case ActiveMQDestination.QUEUE_TYPE: + region=queueRegion; + break; + case ActiveMQDestination.TOPIC_TYPE: + region=topicRegion; + break; + case ActiveMQDestination.TEMP_QUEUE_TYPE: + region=tempQueueRegion; + break; + case ActiveMQDestination.TEMP_TOPIC_TYPE: + region=tempTopicRegion; + break; + default: + throw createUnknownDestinationTypeException(destination); + } + consumerExchange.setRegion(region); } + consumerExchange.getRegion().acknowledge(consumerExchange,ack); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 2041206ffd..c49077d394 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy; @@ -242,7 +243,6 @@ public class Topic implements Destination { if( message.isExpired() ) { return; } - if (context.isProducerFlowControl()) { if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) { throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); @@ -426,7 +426,7 @@ public class Topic implements Destination { // Implementation methods // ------------------------------------------------------------------------- - protected void dispatch(ConnectionContext context, Message message) throws Exception { + protected void dispatch(final ConnectionContext context, Message message) throws Exception { destinationStatistics.getEnqueues().increment(); dispatchValve.increment(); MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); @@ -481,7 +481,10 @@ public class Topic implements Destination { boolean originalFlowControl = context.isProducerFlowControl(); try { context.setProducerFlowControl(false); - context.getBroker().send(context, message); + ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + producerExchange.setMutable(false); + producerExchange.setConnectionContext(context); + context.getBroker().send(producerExchange, message); } finally { context.setProducerFlowControl(originalFlowControl); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java index 7f2f85063d..fc124b631f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java @@ -19,6 +19,8 @@ package org.apache.activemq.broker.util; import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.commons.logging.Log; @@ -37,18 +39,18 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { private Log sendLog = LogFactory.getLog(LoggingBrokerPlugin.class.getName()+".Send"); private Log ackLog = LogFactory.getLog(LoggingBrokerPlugin.class.getName()+".Ack"); - public void send(ConnectionContext context, Message messageSend) throws Exception { + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { if (sendLog.isInfoEnabled()) { sendLog.info("Sending: " + messageSend); } - super.send(context, messageSend); + super.send(producerExchange, messageSend); } - public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { if (ackLog.isInfoEnabled()) { ackLog.info("Acknowledge: " + ack); } - super.acknowledge(context, ack); + super.acknowledge(consumerExchange, ack); } // Properties diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java index 005f72062f..e8a84d97b3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.util; import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.Message; @@ -37,11 +38,11 @@ import org.apache.activemq.command.Message; * @version $Revision$ */ public class TimeStampingBrokerPlugin extends BrokerPluginSupport { - public void send(ConnectionContext context, Message message) throws Exception { + public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { //timestamp not been disabled and has not passed through a network message.setTimestamp(System.currentTimeMillis()); } - super.send(context, message); + super.send(producerExchange, message); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java index 7718f17350..d147d2a9d5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java @@ -30,6 +30,8 @@ import java.net.UnknownHostException; import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; @@ -127,14 +129,14 @@ public class UDPTraceBrokerPlugin extends BrokerPluginSupport { } } - public void send(ConnectionContext context, Message messageSend) throws Exception { + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { trace(messageSend); - super.send(context, messageSend); + super.send(producerExchange, messageSend); } - public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { trace(ack); - super.acknowledge(context, ack); + super.acknowledge(consumerExchange, ack); } public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java index 5c4fb4001e..ed5381dec0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java @@ -31,6 +31,7 @@ import javax.management.ObjectName; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.jmx.SubscriptionViewMBean; @@ -106,8 +107,8 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { } } - public void send(ConnectionContext context, Message messageSend) throws Exception { - super.send(context, messageSend); + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + super.send(producerExchange, messageSend); ProducerId producerId = messageSend.getProducerId(); ActiveMQDestination destination = messageSend.getDestination(); synchronized (lock) { diff --git a/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java b/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java index 77135f1ec0..6642fc8ec1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java @@ -20,6 +20,7 @@ package org.apache.activemq.security; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; @@ -165,8 +166,8 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB super.addProducer(context, info); } - public void send(ConnectionContext context, Message messageSend) throws Exception { - SecurityContext subject = (SecurityContext) context.getSecurityContext(); + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + SecurityContext subject = (SecurityContext) producerExchange.getConnectionContext().getSecurityContext(); if( subject == null ) throw new SecurityException("User is not authenticated."); @@ -185,7 +186,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB subject.getAuthorizedWriteDests().put(messageSend.getDestination(), messageSend.getDestination()); } - super.send(context, messageSend); + super.send(producerExchange, messageSend); } // SecurityAdminMBean interface diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 6d205f6d07..f4bbf03ae8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.command.Command; -import org.apache.activemq.command.Message; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -74,9 +73,6 @@ public class VMTransport implements Transport,Task{ } public void oneway(Object command) throws IOException{ - if (command instanceof Message) { - command = ((Message)command).copy(); - } if(disposed){ throw new TransportDisposedIOException("Transport disposed."); } @@ -94,9 +90,6 @@ public class VMTransport implements Transport,Task{ } protected void syncOneWay(Object command){ - if (command instanceof Message) { - command = ((Message)command).copy(); - } final TransportListener tl=peer.transportListener; prePeerSetQueue=peer.prePeerSetQueue; if(tl==null){ diff --git a/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java b/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java index f71c09c068..607003985e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.util; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -39,7 +40,10 @@ public class BrokerSupport { boolean originalFlowControl=context.isProducerFlowControl(); try{ context.setProducerFlowControl(false); - context.getBroker().send(context,message); + ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + producerExchange.setMutable(true); + producerExchange.setConnectionContext(context); + context.getBroker().send(producerExchange,message); }finally{ context.setProducerFlowControl(originalFlowControl); } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java index 1122d968c2..f66f03e884 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java @@ -173,7 +173,7 @@ public class StubBroker implements Broker { public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { } - public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { } public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { @@ -212,7 +212,7 @@ public class StubBroker implements Broker { public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { } - public void send(ConnectionContext context, Message message) throws Exception { + public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { } public void start() throws Exception {