performance tuning

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@514131 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-03-03 11:30:22 +00:00
parent 31d327726a
commit 84eb9f8b69
26 changed files with 416 additions and 130 deletions

View File

@ -1585,6 +1585,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
message.setJMSMessageID(msg.getMessageId().toString());
}
msg.setTransactionId(txid);
if(connection.isCopyMessageOnSend()){
msg=(ActiveMQMessage)msg.copy();
}
msg.setConnection(connection);
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());

View File

@ -22,6 +22,7 @@ import java.util.Iterator;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
@ -265,11 +266,13 @@ public class AdvisoryBroker extends BrokerFilter {
advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false);
advisoryMessage.setProducerId(advisoryProducerId);
boolean originalFlowControl = context.isProducerFlowControl();
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
try {
context.setProducerFlowControl(false);
next.send(context, advisoryMessage);
next.send(producerExchange, advisoryMessage);
} finally {
context.setProducerFlowControl(originalFlowControl);
}

View File

@ -43,11 +43,11 @@ public class BrokerBroadcaster extends BrokerFilter{
super(next);
}
public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{
next.acknowledge(context,ack);
public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
next.acknowledge(consumerExchange,ack);
Broker brokers[]=getListeners();
for(int i=0;i<brokers.length;i++){
brokers[i].acknowledge(context,ack);
brokers[i].acknowledge(consumerExchange,ack);
}
}
@ -134,11 +134,11 @@ public class BrokerBroadcaster extends BrokerFilter{
}
}
public void send(ConnectionContext context,Message messageSend) throws Exception{
next.send(context,messageSend);
public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception{
next.send(producerExchange,messageSend);
Broker brokers[]=getListeners();
for(int i=0;i<brokers.length;i++){
brokers[i].send(context,messageSend);
brokers[i].send(producerExchange,messageSend);
}
}

View File

@ -70,8 +70,8 @@ public class BrokerFilter implements Broker {
return next.getDestinations(destination);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
next.acknowledge(context, ack);
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
next.acknowledge(consumerExchange, ack);
}
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
@ -122,8 +122,8 @@ public class BrokerFilter implements Broker {
next.rollbackTransaction(context, xid);
}
public void send(ConnectionContext context, Message messageSend) throws Exception {
next.send(context, messageSend);
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
next.send(producerExchange, messageSend);
}
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {

View File

@ -81,7 +81,7 @@ public class CompositeDestinationBroker extends BrokerFilter {
/**
*
*/
public void send(ConnectionContext context, Message message) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
ActiveMQDestination destination = message.getDestination();
if( destination.isComposite() ) {
ActiveMQDestination[] destinations = destination.getCompositeDestinations();
@ -92,10 +92,10 @@ public class CompositeDestinationBroker extends BrokerFilter {
message.setOriginalDestination(destination);
message.setDestination(destinations[i]);
message.evictMarshlledForm();
next.send(context, message);
next.send(producerExchange, message);
}
} else {
next.send(context, message);
next.send(producerExchange, message);
}
}

View File

@ -0,0 +1,88 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.Subscription;
/**
* Holds internal state in the broker for a essageConsumer
*
* @version $Revision: 1.8 $
*/
public class ConsumerBrokerExchange{
private ConnectionContext connectionContext;
private Destination regionDestination;
private Region region;
private Subscription subscription;
/**
* @return the connectionContext
*/
public ConnectionContext getConnectionContext(){
return this.connectionContext;
}
/**
* @param connectionContext the connectionContext to set
*/
public void setConnectionContext(ConnectionContext connectionContext){
this.connectionContext=connectionContext;
}
/**
* @return the region
*/
public Region getRegion(){
return this.region;
}
/**
* @param region the region to set
*/
public void setRegion(Region region){
this.region=region;
}
/**
* @return the regionDestination
*/
public Destination getRegionDestination(){
return this.regionDestination;
}
/**
* @param regionDestination the regionDestination to set
*/
public void setRegionDestination(Destination regionDestination){
this.regionDestination=regionDestination;
}
/**
* @return the subscription
*/
public Subscription getSubscription(){
return this.subscription;
}
/**
* @param subscription the subscription to set
*/
public void setSubscription(Subscription subscription){
this.subscription=subscription;
}
}

View File

@ -153,11 +153,11 @@ public class EmptyBroker implements Broker {
}
public void send(ConnectionContext context, Message message) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
}

View File

@ -155,11 +155,11 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message);
}
public void send(ConnectionContext context, Message message) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
throw new BrokerStoppedException(this.message);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
throw new BrokerStoppedException(this.message);
}

View File

@ -84,8 +84,8 @@ public class MutableBrokerFilter implements Broker {
return getNext().getDestinations(destination);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
getNext().acknowledge(context, ack);
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
getNext().acknowledge(consumerExchange, ack);
}
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
@ -132,8 +132,8 @@ public class MutableBrokerFilter implements Broker {
getNext().rollbackTransaction(context, xid);
}
public void send(ConnectionContext context, Message messageSend) throws Exception {
getNext().send(context, messageSend);
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
getNext().send(producerExchange, messageSend);
}
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {

View File

@ -0,0 +1,103 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.state.ProducerState;
/**
* Holds internal state in the broker for a MessageProducer
*
* @version $Revision: 1.8 $
*/
public class ProducerBrokerExchange{
private ConnectionContext connectionContext;
private Destination regionDestination;
private Region region;
private ProducerState producerState;
private boolean mutable=true;
/**
* @return the connectionContext
*/
public ConnectionContext getConnectionContext(){
return this.connectionContext;
}
/**
* @param connectionContext the connectionContext to set
*/
public void setConnectionContext(ConnectionContext connectionContext){
this.connectionContext=connectionContext;
}
/**
* @return the mutable
*/
public boolean isMutable(){
return this.mutable;
}
/**
* @param mutable the mutable to set
*/
public void setMutable(boolean mutable){
this.mutable=mutable;
}
/**
* @return the regionDestination
*/
public Destination getRegionDestination(){
return this.regionDestination;
}
/**
* @param regionDestination the regionDestination to set
*/
public void setRegionDestination(Destination regionDestination){
this.regionDestination=regionDestination;
}
/**
* @return the region
*/
public Region getRegion(){
return this.region;
}
/**
* @param region the region to set
*/
public void setRegion(Region region){
this.region=region;
}
/**
* @return the producerState
*/
public ProducerState getProducerState(){
return this.producerState;
}
/**
* @param producerState the producerState to set
*/
public void setProducerState(ProducerState producerState){
this.producerState=producerState;
}
}

View File

@ -78,16 +78,20 @@ public class TransactionBroker extends BrokerFilter {
context.setInRecoveryMode(true);
context.setTransactions(new ConcurrentHashMap());
context.setProducerFlowControl(false);
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setMutable(true);
producerExchange.setConnectionContext(context);
final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
consumerExchange.setConnectionContext(context);
transactionStore.recover(new TransactionRecoveryListener() {
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
try {
beginTransaction(context, xid);
for (int i = 0; i < addedMessages.length; i++) {
send(context, addedMessages[i]);
send(producerExchange, addedMessages[i]);
}
for (int i = 0; i < aks.length; i++) {
acknowledge(context, aks[i]);
acknowledge(consumerExchange, aks[i]);
}
prepareTransaction(context, xid);
} catch (Throwable e) {
@ -168,9 +172,10 @@ public class TransactionBroker extends BrokerFilter {
transaction.rollback();
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
// This method may be invoked recursively.
// Track original tx so that it can be restored.
final ConnectionContext context = consumerExchange.getConnectionContext();
Transaction originalTx = context.getTransaction();
Transaction transaction=null;
if( ack.isInTransaction() ) {
@ -178,15 +183,16 @@ public class TransactionBroker extends BrokerFilter {
}
context.setTransaction(transaction);
try {
next.acknowledge(context, ack);
next.acknowledge(consumerExchange, ack);
} finally {
context.setTransaction(originalTx);
}
}
public void send(ConnectionContext context, Message message) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
// This method may be invoked recursively.
// Track original tx so that it can be restored.
final ConnectionContext context = producerExchange.getConnectionContext();
Transaction originalTx = context.getTransaction();
Transaction transaction=null;
if( message.getTransactionId()!=null ) {
@ -194,7 +200,7 @@ public class TransactionBroker extends BrokerFilter {
}
context.setTransaction(transaction);
try {
next.send(context, message);
next.send(producerExchange, message);
} finally {
context.setTransaction(originalTx);
}

View File

@ -17,6 +17,7 @@ package org.apache.activemq.broker;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -110,9 +111,11 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
private boolean pendingStop;
private long timeStamp=0;
private AtomicBoolean stopped=new AtomicBoolean(false);
protected final AtomicBoolean disposed=new AtomicBoolean(false);
private final AtomicBoolean disposed=new AtomicBoolean(false);
private CountDownLatch stopLatch=new CountDownLatch(1);
protected final AtomicBoolean asyncException=new AtomicBoolean(false);
private final AtomicBoolean asyncException=new AtomicBoolean(false);
private final Map<ProducerId,ProducerBrokerExchange>producerExchanges = new HashMap<ProducerId,ProducerBrokerExchange>();
private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
static class ConnectionState extends org.apache.activemq.state.ConnectionState{
@ -427,33 +430,24 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
public Response processMessage(Message messageSend) throws Exception{
ProducerId producerId=messageSend.getProducerId();
ConnectionState state=lookupConnectionState(producerId);
ConnectionContext context=state.getContext();
// If the message originates from this client connection,
// then, finde the associated producer state so we can do some dup detection.
ProducerState producerState=null;
if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){
SessionState ss=state.getSessionState(producerId.getParentId());
if(ss==null)
throw new IllegalStateException("Cannot send from a session that had not been registered: "
+producerId.getParentId());
producerState=ss.getProducerState(producerId);
}
if(producerState==null){
broker.send(context,messageSend);
}else{
// Avoid Dups.
ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId);
ProducerState producerState=producerExchange.getProducerState();
if(producerState!=null){
long seq=messageSend.getMessageId().getProducerSequenceId();
if(seq>producerState.getLastSequenceId()){
producerState.setLastSequenceId(seq);
broker.send(context,messageSend);
broker.send(producerExchange,messageSend);
}
}else{
// producer not local to this broker
broker.send(producerExchange,messageSend);
}
return null;
}
public Response processMessageAck(MessageAck ack) throws Exception{
broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(),ack);
ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
broker.acknowledge(consumerExchange,ack);
return null;
}
@ -515,6 +509,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
ProducerState ps=ss.removeProducer(id);
if(ps==null)
throw new IllegalStateException("Cannot remove a producer that had not been registered: "+id);
removeProducerBrokerExchange(id);
broker.removeProducer(cs.getContext(),ps.getInfo());
return null;
}
@ -551,6 +546,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(consumerState==null)
throw new IllegalStateException("Cannot remove a consumer that had not been registered: "+id);
broker.removeConsumer(cs.getContext(),consumerState.getInfo());
removeConsumerBrokerExchange(id);
return null;
}
@ -981,4 +977,53 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
public String getRemoteAddress(){
return transport.getRemoteAddress();
}
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
ProducerBrokerExchange result=producerExchanges.get(id);
if(result==null){
synchronized(producerExchanges){
result=new ProducerBrokerExchange();
ConnectionState state=lookupConnectionState(id);
ConnectionContext context=state.getContext();
result.setConnectionContext(context);
SessionState ss=state.getSessionState(id.getParentId());
if(ss!=null){
result.setProducerState(ss.getProducerState(id));
ProducerState producerState=ss.getProducerState(id);
if(producerState!=null&&producerState.getInfo()!=null){
ProducerInfo info=producerState.getInfo();
result.setMutable(info.getDestination()==null);
}
}
producerExchanges.put(id,result);
}
}
return result;
}
private void removeProducerBrokerExchange(ProducerId id) {
synchronized(producerExchanges) {
producerExchanges.remove(id);
}
}
private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
ConsumerBrokerExchange result = consumerExchanges.get(id);
if (result == null) {
synchronized(consumerExchanges) {
result = new ConsumerBrokerExchange();
ConnectionState state = lookupConnectionState(id);
ConnectionContext context = state.getContext();
result.setConnectionContext(context);
consumerExchanges.put(id,result);
}
}
return result;
}
private void removeConsumerBrokerExchange(ConsumerId id) {
synchronized(consumerExchanges) {
consumerExchanges.remove(id);
}
}
}

View File

@ -33,9 +33,10 @@ public class UserIDBroker extends BrokerFilter {
super(next);
}
public void send(ConnectionContext context, Message messageSend) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
String userID = context.getUserName();
messageSend.setUserID(userID);
super.send(context, messageSend);
super.send(producerExchange, messageSend);
}
}

View File

@ -17,8 +17,10 @@ package org.apache.activemq.broker.ft;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@ -298,13 +300,13 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
* @throws Exception
*
*/
public void send(ConnectionContext context,Message message) throws Exception{
public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception{
/**
* A message can be dispatched before the super.send() method returns so - here the order is switched to avoid
* problems on the slave with receiving acks for messages not received yey
*/
sendToSlave(message);
super.send(context,message);
super.send(producerExchange,message);
}
/**
@ -313,9 +315,9 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
* @throws Exception
*
*/
public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{
public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
sendToSlave(ack);
super.acknowledge(context,ack);
super.acknowledge(consumerExchange,ack);
}
public boolean isFaultTolerantConfiguration(){

View File

@ -25,7 +25,9 @@ import java.util.Set;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@ -300,17 +302,28 @@ abstract public class AbstractRegion implements Region {
throw new JMSException("Invalid operation.");
}
public void send(ConnectionContext context, Message messageSend)
public void send(final ProducerBrokerExchange producerExchange, Message messageSend)
throws Exception {
Destination dest = lookup(context, messageSend.getDestination());
dest.send(context, messageSend);
final ConnectionContext context = producerExchange.getConnectionContext();
if (producerExchange.isMutable() || producerExchange.getRegionDestination()==null) {
final Destination regionDestination = lookup(context,messageSend.getDestination());
producerExchange.setRegionDestination(regionDestination);
}
producerExchange.getRegionDestination().send(context, messageSend);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
Subscription sub = (Subscription) subscriptions.get(ack.getConsumerId());
if( sub==null )
throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId());
sub.acknowledge(context, ack);
public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
Subscription sub=consumerExchange.getSubscription();
if(sub==null){
sub=(Subscription)subscriptions.get(ack.getConsumerId());
if(sub==null){
throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId());
}
consumerExchange.setSubscription(sub);
}
sub.acknowledge(consumerExchange.getConnectionContext(),ack);
}
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {

View File

@ -19,6 +19,8 @@ package org.apache.activemq.broker.region;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@ -98,17 +100,18 @@ public interface Region extends Service {
* Send a message to the broker to using the specified destination. The destination specified
* in the message does not need to match the destination the message is sent to. This is
* handy in case the message is being sent to a dead letter destination.
* @param context the environment the operation is being executed under.
* @param producerExchange the environment the operation is being executed under.
* @param message
* @throws Exception TODO
*/
public void send(ConnectionContext context, Message message) throws Exception;
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception;
/**
* Used to acknowledge the receipt of a message by a client.
* @param context the environment the operation is being executed under.
* @param consumerExchange the environment the operation is being executed under.
* @throws Exception TODO
*/
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception;
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception;
/**
* Allows a consumer to pull a message from a queue

View File

@ -31,7 +31,9 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
@ -366,46 +368,56 @@ public class RegionBroker implements Broker {
topicRegion.removeSubscription(context, info);
}
public void send(ConnectionContext context, Message message) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
long si = sequenceGenerator.getNextSequenceId();
message.getMessageId().setBrokerSequenceId(si);
ActiveMQDestination destination = message.getDestination();
switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.send(context, message);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.send(context, message);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.send(context, message);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.send(context, message);
break;
default:
throw createUnknownDestinationTypeException(destination);
if (producerExchange.isMutable() || producerExchange.getRegion()==null) {
ActiveMQDestination destination = message.getDestination();
Region region = null;
switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
region = queueRegion;
break;
case ActiveMQDestination.TOPIC_TYPE:
region = topicRegion;
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
region = tempQueueRegion;
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
region = tempTopicRegion;
break;
default:
throw createUnknownDestinationTypeException(destination);
}
producerExchange.setRegion(region);
}
producerExchange.getRegion().send(producerExchange,message);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
ActiveMQDestination destination = ack.getDestination();
switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.acknowledge(context, ack);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.acknowledge(context, ack);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.acknowledge(context, ack);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.acknowledge(context, ack);
break;
default:
throw createUnknownDestinationTypeException(destination);
public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
if(consumerExchange.getRegion()==null){
ActiveMQDestination destination=ack.getDestination();
Region region=null;
switch(destination.getDestinationType()){
case ActiveMQDestination.QUEUE_TYPE:
region=queueRegion;
break;
case ActiveMQDestination.TOPIC_TYPE:
region=topicRegion;
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
region=tempQueueRegion;
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
region=tempTopicRegion;
break;
default:
throw createUnknownDestinationTypeException(destination);
}
consumerExchange.setRegion(region);
}
consumerExchange.getRegion().acknowledge(consumerExchange,ack);
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
@ -242,7 +243,6 @@ public class Topic implements Destination {
if( message.isExpired() ) {
return;
}
if (context.isProducerFlowControl()) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
@ -426,7 +426,7 @@ public class Topic implements Destination {
// Implementation methods
// -------------------------------------------------------------------------
protected void dispatch(ConnectionContext context, Message message) throws Exception {
protected void dispatch(final ConnectionContext context, Message message) throws Exception {
destinationStatistics.getEnqueues().increment();
dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
@ -481,7 +481,10 @@ public class Topic implements Destination {
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);
context.getBroker().send(context, message);
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setMutable(false);
producerExchange.setConnectionContext(context);
context.getBroker().send(producerExchange, message);
} finally {
context.setProducerFlowControl(originalFlowControl);
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.broker.util;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.commons.logging.Log;
@ -37,18 +39,18 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
private Log sendLog = LogFactory.getLog(LoggingBrokerPlugin.class.getName()+".Send");
private Log ackLog = LogFactory.getLog(LoggingBrokerPlugin.class.getName()+".Ack");
public void send(ConnectionContext context, Message messageSend) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
if (sendLog.isInfoEnabled()) {
sendLog.info("Sending: " + messageSend);
}
super.send(context, messageSend);
super.send(producerExchange, messageSend);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
if (ackLog.isInfoEnabled()) {
ackLog.info("Acknowledge: " + ack);
}
super.acknowledge(context, ack);
super.acknowledge(consumerExchange, ack);
}
// Properties

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.util;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
@ -37,11 +38,11 @@ import org.apache.activemq.command.Message;
* @version $Revision$
*/
public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
public void send(ConnectionContext context, Message message) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) {
//timestamp not been disabled and has not passed through a network
message.setTimestamp(System.currentTimeMillis());
}
super.send(context, message);
super.send(producerExchange, message);
}
}

View File

@ -30,6 +30,8 @@ import java.net.UnknownHostException;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
@ -127,14 +129,14 @@ public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
}
}
public void send(ConnectionContext context, Message messageSend) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
trace(messageSend);
super.send(context, messageSend);
super.send(producerExchange, messageSend);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
trace(ack);
super.acknowledge(context, ack);
super.acknowledge(consumerExchange, ack);
}
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {

View File

@ -31,6 +31,7 @@ import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
@ -106,8 +107,8 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
}
}
public void send(ConnectionContext context, Message messageSend) throws Exception {
super.send(context, messageSend);
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
super.send(producerExchange, messageSend);
ProducerId producerId = messageSend.getProducerId();
ActiveMQDestination destination = messageSend.getDestination();
synchronized (lock) {

View File

@ -20,6 +20,7 @@ package org.apache.activemq.security;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
@ -165,8 +166,8 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
super.addProducer(context, info);
}
public void send(ConnectionContext context, Message messageSend) throws Exception {
SecurityContext subject = (SecurityContext) context.getSecurityContext();
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
SecurityContext subject = (SecurityContext) producerExchange.getConnectionContext().getSecurityContext();
if( subject == null )
throw new SecurityException("User is not authenticated.");
@ -185,7 +186,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
subject.getAuthorizedWriteDests().put(messageSend.getDestination(), messageSend.getDestination());
}
super.send(context, messageSend);
super.send(producerExchange, messageSend);
}
// SecurityAdminMBean interface

View File

@ -23,7 +23,6 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Message;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -74,9 +73,6 @@ public class VMTransport implements Transport,Task{
}
public void oneway(Object command) throws IOException{
if (command instanceof Message) {
command = ((Message)command).copy();
}
if(disposed){
throw new TransportDisposedIOException("Transport disposed.");
}
@ -94,9 +90,6 @@ public class VMTransport implements Transport,Task{
}
protected void syncOneWay(Object command){
if (command instanceof Message) {
command = ((Message)command).copy();
}
final TransportListener tl=peer.transportListener;
prePeerSetQueue=peer.prePeerSetQueue;
if(tl==null){

View File

@ -17,6 +17,7 @@
package org.apache.activemq.util;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -39,7 +40,10 @@ public class BrokerSupport {
boolean originalFlowControl=context.isProducerFlowControl();
try{
context.setProducerFlowControl(false);
context.getBroker().send(context,message);
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setMutable(true);
producerExchange.setConnectionContext(context);
context.getBroker().send(producerExchange,message);
}finally{
context.setProducerFlowControl(originalFlowControl);
}

View File

@ -173,7 +173,7 @@ public class StubBroker implements Broker {
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
@ -212,7 +212,7 @@ public class StubBroker implements Broker {
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
}
public void send(ConnectionContext context, Message message) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
}
public void start() throws Exception {