Enhanced the ActiveMQConnection to use the CommandVisitor instead of using a big if swtich

when handling commands from the broker.  This should be slightly more efficient.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@516492 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-03-09 18:23:44 +00:00
parent bdef2f52d0
commit fb3b6dba57
10 changed files with 303 additions and 114 deletions

View File

@ -87,6 +87,7 @@ import org.apache.activemq.management.JMSConnectionStatsImpl;
import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.state.CommandVisitorAdapter;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
@ -1540,53 +1541,81 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void onCommand(final Object o) { public void onCommand(final Object o) {
final Command command = (Command) o; final Command command = (Command) o;
if (!closed.get() && command != null) { if (!closed.get() && command != null) {
if (command.isMessageDispatch()) { try {
MessageDispatch md = (MessageDispatch) command; command.visit(new CommandVisitorAdapter(){
ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId()); @Override
if (dispatcher != null) { public Response processMessageDispatch(MessageDispatch md) throws Exception {
// Copy in case a embedded broker is dispatching via vm:// ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId());
// md.getMessage() == null to signal end of queue browse. if (dispatcher != null) {
Message msg = md.getMessage(); // Copy in case a embedded broker is dispatching via vm://
if( msg!=null ) { // md.getMessage() == null to signal end of queue browse.
msg = msg.copy(); Message msg = md.getMessage();
msg.setReadOnlyBody(true); if( msg!=null ) {
msg.setReadOnlyProperties(true); msg = msg.copy();
msg.setRedeliveryCounter(md.getRedeliveryCounter()); msg.setReadOnlyBody(true);
msg.setConnection(this); msg.setReadOnlyProperties(true);
md.setMessage( msg ); msg.setRedeliveryCounter(md.getRedeliveryCounter());
} msg.setConnection(ActiveMQConnection.this);
dispatcher.dispatch(md); md.setMessage( msg );
} }
} else if (command.getDataStructureType() == ProducerAck.DATA_STRUCTURE_TYPE ) { dispatcher.dispatch(md);
ProducerAck pa = (ProducerAck) command; }
ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); return null;
if( producer!=null ) { }
producer.onProducerAck(pa);
} @Override
} else if ( command.isBrokerInfo() ) { public Response processProducerAck(ProducerAck pa) throws Exception {
this.brokerInfo = (BrokerInfo)command; ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
brokerInfoReceived.countDown(); if( producer!=null ) {
this.optimizeAcknowledge &= !this.brokerInfo.isFaultTolerantConfiguration(); producer.onProducerAck(pa);
getBlobTransferPolicy().setBrokerUploadUrl(brokerInfo.getBrokerUploadUrl()); }
} return null;
else if (command instanceof ControlCommand) { }
onControlCommand((ControlCommand) command);
} @Override
else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { public Response processBrokerInfo(BrokerInfo info) throws Exception {
asyncConnectionThread.execute(new Runnable(){ brokerInfoReceived.countDown();
public void run() { optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
onAsyncException(((ConnectionError)command).getException()); getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
} return null;
}); }
new Thread("Async error worker") {
}.start(); @Override
}else if (command instanceof ConnectionControl){ public Response processConnectionError(final ConnectionError error) throws Exception {
onConnectionControl((ConnectionControl) command); asyncConnectionThread.execute(new Runnable(){
}else if (command instanceof ConsumerControl){ public void run() {
onConsumerControl((ConsumerControl) command); onAsyncException(error.getException());
}else if ( command.isWireFormatInfo() ) { }
onWireFormatInfo((WireFormatInfo)command); });
} new Thread("Async error worker") {
}.start();
return null;
}
@Override
public Response processControlCommand(ControlCommand command) throws Exception {
onControlCommand(command);
return null;
}
@Override
public Response processConnectionControl(ConnectionControl control) throws Exception {
onConnectionControl((ConnectionControl) command);
return null;
}
@Override
public Response processConsumerControl(ConsumerControl control) throws Exception {
onConsumerControl((ConsumerControl) command);
return null;
}
@Override
public Response processWireFormat(WireFormatInfo info) throws Exception {
onConsumerControl((ConsumerControl) command);
return null;
}
});
} catch (Exception e) {
onAsyncException(e);
}
} }
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = (TransportListener) iter.next(); TransportListener listener = (TransportListener) iter.next();

View File

@ -40,8 +40,10 @@ import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DataArrayResponse; import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.ExceptionResponse;
@ -1178,4 +1180,26 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
return protocolVersion.get(); return protocolVersion.get();
} }
public Response processControlCommand(ControlCommand command) throws Exception {
if (command.equals("shutdown"))
System.exit(0);
return null;
}
public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
return null;
}
public Response processConnectionControl(ConnectionControl control) throws Exception {
return null;
}
public Response processConnectionError(ConnectionError error) throws Exception {
return null;
}
public Response processConsumerControl(ConsumerControl control) throws Exception {
return null;
}
} }

View File

@ -38,7 +38,7 @@ public class ConnectionControl extends BaseCommand{
} }
public Response visit(CommandVisitor visitor) throws Exception{ public Response visit(CommandVisitor visitor) throws Exception{
return null; return visitor.processConnectionControl(this);
} }
/** /**

View File

@ -37,7 +37,7 @@ public class ConnectionError extends BaseCommand {
} }
public Response visit(CommandVisitor visitor) throws Exception { public Response visit(CommandVisitor visitor) throws Exception {
return null; return visitor.processConnectionError(this);
} }
/** /**

View File

@ -41,17 +41,10 @@ public class ConsumerControl extends BaseCommand {
} }
public Response visit(CommandVisitor visitor) throws Exception { public Response visit(CommandVisitor visitor) throws Exception {
return null; return visitor.processConsumerControl(this);
} }
/** /**
* @openwire:property version=1 * @openwire:property version=1
* @return Returns the close. * @return Returns the close.

View File

@ -48,8 +48,6 @@ public class ControlCommand extends BaseCommand {
} }
public Response visit(CommandVisitor visitor) throws Exception { public Response visit(CommandVisitor visitor) throws Exception {
if (command.equals("shutdown")) return visitor.processControlCommand(this);
System.exit(0);
return null;
} }
} }

View File

@ -102,7 +102,7 @@ public class MessageDispatch extends BaseCommand {
} }
public Response visit(CommandVisitor visitor) throws Exception { public Response visit(CommandVisitor visitor) throws Exception {
return null; return visitor.processMessageDispatch(this);
} }
public Runnable getTransmitCallback() { public Runnable getTransmitCallback() {

View File

@ -18,15 +18,20 @@
package org.apache.activemq.state; package org.apache.activemq.state;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.FlushCommand; import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerAck;
@ -77,6 +82,11 @@ public interface CommandVisitor {
Response processEndTransaction(TransactionInfo info) throws Exception; Response processEndTransaction(TransactionInfo info) throws Exception;
Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception; Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;
Response processProducerAck(ProducerAck ack) throws Exception; Response processProducerAck(ProducerAck ack) throws Exception;
Response processMessageDispatch(MessageDispatch dispatch) throws Exception;
Response processControlCommand(ControlCommand command) throws Exception;
Response processConnectionError(ConnectionError error) throws Exception;
Response processConnectionControl(ConnectionControl control) throws Exception;
Response processConsumerControl(ConsumerControl control) throws Exception;
} }

View File

@ -0,0 +1,186 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.state;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
public class CommandVisitorAdapter implements CommandVisitor {
public Response processAddConnection(ConnectionInfo info) throws Exception {
return null;
}
public Response processAddConsumer(ConsumerInfo info) throws Exception {
return null;
}
public Response processAddDestination(DestinationInfo info) throws Exception {
return null;
}
public Response processAddProducer(ProducerInfo info) throws Exception {
return null;
}
public Response processAddSession(SessionInfo info) throws Exception {
return null;
}
public Response processBeginTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processBrokerInfo(BrokerInfo info) throws Exception {
return null;
}
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
return null;
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
return null;
}
public Response processEndTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processFlush(FlushCommand command) throws Exception {
return null;
}
public Response processForgetTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processKeepAlive(KeepAliveInfo info) throws Exception {
return null;
}
public Response processMessage(Message send) throws Exception {
return null;
}
public Response processMessageAck(MessageAck ack) throws Exception {
return null;
}
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
return null;
}
public Response processMessagePull(MessagePull pull) throws Exception {
return null;
}
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processProducerAck(ProducerAck ack) throws Exception {
return null;
}
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
return null;
}
public Response processRemoveConnection(ConnectionId id) throws Exception {
return null;
}
public Response processRemoveConsumer(ConsumerId id) throws Exception {
return null;
}
public Response processRemoveDestination(DestinationInfo info) throws Exception {
return null;
}
public Response processRemoveProducer(ProducerId id) throws Exception {
return null;
}
public Response processRemoveSession(SessionId id) throws Exception {
return null;
}
public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
return null;
}
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processShutdown(ShutdownInfo info) throws Exception {
return null;
}
public Response processWireFormat(WireFormatInfo info) throws Exception {
return null;
}
public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
return null;
}
public Response processControlCommand(ControlCommand command) throws Exception {
return null;
}
public Response processConnectionControl(ConnectionControl control) throws Exception {
return null;
}
public Response processConnectionError(ConnectionError error) throws Exception {
return null;
}
public Response processConsumerControl(ConsumerControl control) throws Exception {
return null;
}
}

View File

@ -19,42 +19,32 @@ package org.apache.activemq.state;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* Tracks the state of a connection so a newly established transport can * Tracks the state of a connection so a newly established transport can
* be re-initialized to the state that was tracked. * be re-initialized to the state that was tracked.
* *
* @version $Revision$ * @version $Revision$
*/ */
public class ConnectionStateTracker implements CommandVisitor { public class ConnectionStateTracker extends CommandVisitorAdapter {
private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
@ -311,9 +301,6 @@ public class ConnectionStateTracker implements CommandVisitor {
return TRACKED_RESPONSE_MARKER; return TRACKED_RESPONSE_MARKER;
} }
public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
return null;
}
public Response processMessage(Message send) throws Exception{ public Response processMessage(Message send) throws Exception{
if(trackTransactions&&send!=null&&send.getTransactionId()!=null){ if(trackTransactions&&send!=null&&send.getTransactionId()!=null){
@ -448,43 +435,6 @@ public class ConnectionStateTracker implements CommandVisitor {
return null; return null;
} }
public Response processRecoverTransactions(TransactionInfo info) {
return null;
}
public Response processForgetTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processWireFormat(WireFormatInfo info) throws Exception {
return null;
}
public Response processKeepAlive(KeepAliveInfo info) throws Exception {
return null;
}
public Response processShutdown(ShutdownInfo info) throws Exception {
return null;
}
public Response processBrokerInfo(BrokerInfo info) throws Exception {
return null;
}
public Response processFlush(FlushCommand command) throws Exception {
return null;
}
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{
return null;
}
public Response processMessagePull(MessagePull pull) throws Exception {
return null;
}
public Response processProducerAck(ProducerAck ack) throws Exception {
return null;
}
public boolean isRestoreConsumers() { public boolean isRestoreConsumers() {
return restoreConsumers; return restoreConsumers;
} }
@ -525,5 +475,4 @@ public class ConnectionStateTracker implements CommandVisitor {
this.restoreTransaction = restoreTransaction; this.restoreTransaction = restoreTransaction;
} }
} }