From fb3b6dba571b9dbffaac45ac920037760ceb6dbc Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 9 Mar 2007 18:23:44 +0000 Subject: [PATCH] Enhanced the ActiveMQConnection to use the CommandVisitor instead of using a big if swtich when handling commands from the broker. This should be slightly more efficient. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@516492 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 123 +++++++----- .../activemq/broker/TransportConnection.java | 24 +++ .../activemq/command/ConnectionControl.java | 2 +- .../activemq/command/ConnectionError.java | 2 +- .../activemq/command/ConsumerControl.java | 9 +- .../activemq/command/ControlCommand.java | 4 +- .../activemq/command/MessageDispatch.java | 2 +- .../apache/activemq/state/CommandVisitor.java | 10 + .../activemq/state/CommandVisitorAdapter.java | 186 ++++++++++++++++++ .../state/ConnectionStateTracker.java | 55 +----- 10 files changed, 303 insertions(+), 114 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 54ede5196c..a1c242d449 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -87,6 +87,7 @@ import org.apache.activemq.management.JMSConnectionStatsImpl; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; +import org.apache.activemq.state.CommandVisitorAdapter; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; @@ -1540,53 +1541,81 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void onCommand(final Object o) { final Command command = (Command) o; if (!closed.get() && command != null) { - if (command.isMessageDispatch()) { - MessageDispatch md = (MessageDispatch) command; - ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId()); - if (dispatcher != null) { - // Copy in case a embedded broker is dispatching via vm:// - // md.getMessage() == null to signal end of queue browse. - Message msg = md.getMessage(); - if( msg!=null ) { - msg = msg.copy(); - msg.setReadOnlyBody(true); - msg.setReadOnlyProperties(true); - msg.setRedeliveryCounter(md.getRedeliveryCounter()); - msg.setConnection(this); - md.setMessage( msg ); - } - dispatcher.dispatch(md); - } - } else if (command.getDataStructureType() == ProducerAck.DATA_STRUCTURE_TYPE ) { - ProducerAck pa = (ProducerAck) command; - ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); - if( producer!=null ) { - producer.onProducerAck(pa); - } - } else if ( command.isBrokerInfo() ) { - this.brokerInfo = (BrokerInfo)command; - brokerInfoReceived.countDown(); - this.optimizeAcknowledge &= !this.brokerInfo.isFaultTolerantConfiguration(); - getBlobTransferPolicy().setBrokerUploadUrl(brokerInfo.getBrokerUploadUrl()); - } - else if (command instanceof ControlCommand) { - onControlCommand((ControlCommand) command); - } - else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { - asyncConnectionThread.execute(new Runnable(){ - public void run() { - onAsyncException(((ConnectionError)command).getException()); - } - }); - new Thread("Async error worker") { - }.start(); - }else if (command instanceof ConnectionControl){ - onConnectionControl((ConnectionControl) command); - }else if (command instanceof ConsumerControl){ - onConsumerControl((ConsumerControl) command); - }else if ( command.isWireFormatInfo() ) { - onWireFormatInfo((WireFormatInfo)command); - } + try { + command.visit(new CommandVisitorAdapter(){ + @Override + public Response processMessageDispatch(MessageDispatch md) throws Exception { + ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId()); + if (dispatcher != null) { + // Copy in case a embedded broker is dispatching via vm:// + // md.getMessage() == null to signal end of queue browse. + Message msg = md.getMessage(); + if( msg!=null ) { + msg = msg.copy(); + msg.setReadOnlyBody(true); + msg.setReadOnlyProperties(true); + msg.setRedeliveryCounter(md.getRedeliveryCounter()); + msg.setConnection(ActiveMQConnection.this); + md.setMessage( msg ); + } + dispatcher.dispatch(md); + } + return null; + } + + @Override + public Response processProducerAck(ProducerAck pa) throws Exception { + ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); + if( producer!=null ) { + producer.onProducerAck(pa); + } + return null; + } + + @Override + public Response processBrokerInfo(BrokerInfo info) throws Exception { + brokerInfoReceived.countDown(); + optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); + getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); + return null; + } + + @Override + public Response processConnectionError(final ConnectionError error) throws Exception { + asyncConnectionThread.execute(new Runnable(){ + public void run() { + onAsyncException(error.getException()); + } + }); + new Thread("Async error worker") { + }.start(); + return null; + } + @Override + public Response processControlCommand(ControlCommand command) throws Exception { + onControlCommand(command); + return null; + } + @Override + public Response processConnectionControl(ConnectionControl control) throws Exception { + onConnectionControl((ConnectionControl) command); + return null; + } + @Override + public Response processConsumerControl(ConsumerControl control) throws Exception { + onConsumerControl((ConsumerControl) command); + return null; + } + @Override + public Response processWireFormat(WireFormatInfo info) throws Exception { + onConsumerControl((ConsumerControl) command); + return null; + } + }); + } catch (Exception e) { + onAsyncException(e); + } + } for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { TransportListener listener = (TransportListener) iter.next(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index b4c8551ec2..01b6e1315a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -40,8 +40,10 @@ import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ControlCommand; import org.apache.activemq.command.DataArrayResponse; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.ExceptionResponse; @@ -1178,4 +1180,26 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return protocolVersion.get(); } + public Response processControlCommand(ControlCommand command) throws Exception { + if (command.equals("shutdown")) + System.exit(0); + return null; + } + + public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { + return null; + } + + public Response processConnectionControl(ConnectionControl control) throws Exception { + return null; + } + + public Response processConnectionError(ConnectionError error) throws Exception { + return null; + } + + public Response processConsumerControl(ConsumerControl control) throws Exception { + return null; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java index cf6382fc38..b99bfdd568 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java @@ -38,7 +38,7 @@ public class ConnectionControl extends BaseCommand{ } public Response visit(CommandVisitor visitor) throws Exception{ - return null; + return visitor.processConnectionControl(this); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java index 2f93ba7da0..25259c5b15 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java @@ -37,7 +37,7 @@ public class ConnectionError extends BaseCommand { } public Response visit(CommandVisitor visitor) throws Exception { - return null; + return visitor.processConnectionError(this); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java index 5d63442ce5..afaf31ea3d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java @@ -41,17 +41,10 @@ public class ConsumerControl extends BaseCommand { } - - - public Response visit(CommandVisitor visitor) throws Exception { - return null; + return visitor.processConsumerControl(this); } - - - - /** * @openwire:property version=1 * @return Returns the close. diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java index 52b09c9c7d..3fad2045f9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java @@ -48,8 +48,6 @@ public class ControlCommand extends BaseCommand { } public Response visit(CommandVisitor visitor) throws Exception { - if (command.equals("shutdown")) - System.exit(0); - return null; + return visitor.processControlCommand(this); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java b/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java index b840483248..aa35bc9d49 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java @@ -102,7 +102,7 @@ public class MessageDispatch extends BaseCommand { } public Response visit(CommandVisitor visitor) throws Exception { - return null; + return visitor.processMessageDispatch(this); } public Runnable getTransmitCallback() { diff --git a/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java b/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java index 9a822f2626..67b1507112 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java @@ -18,15 +18,20 @@ package org.apache.activemq.state; import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionControl; +import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ControlCommand; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.FlushCommand; import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.ProducerAck; @@ -77,6 +82,11 @@ public interface CommandVisitor { Response processEndTransaction(TransactionInfo info) throws Exception; Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception; Response processProducerAck(ProducerAck ack) throws Exception; + Response processMessageDispatch(MessageDispatch dispatch) throws Exception; + Response processControlCommand(ControlCommand command) throws Exception; + Response processConnectionError(ConnectionError error) throws Exception; + Response processConnectionControl(ConnectionControl control) throws Exception; + Response processConsumerControl(ConsumerControl control) throws Exception; } diff --git a/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java b/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java new file mode 100644 index 0000000000..1a0341d28b --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java @@ -0,0 +1,186 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.state; + +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionControl; +import org.apache.activemq.command.ConnectionError; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ControlCommand; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.FlushCommand; +import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerAck; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.WireFormatInfo; + +public class CommandVisitorAdapter implements CommandVisitor { + + public Response processAddConnection(ConnectionInfo info) throws Exception { + return null; + } + + public Response processAddConsumer(ConsumerInfo info) throws Exception { + return null; + } + + public Response processAddDestination(DestinationInfo info) throws Exception { + return null; + } + + public Response processAddProducer(ProducerInfo info) throws Exception { + return null; + } + + public Response processAddSession(SessionInfo info) throws Exception { + return null; + } + + public Response processBeginTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processBrokerInfo(BrokerInfo info) throws Exception { + return null; + } + + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + return null; + } + + public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + return null; + } + + public Response processEndTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processFlush(FlushCommand command) throws Exception { + return null; + } + + public Response processForgetTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processKeepAlive(KeepAliveInfo info) throws Exception { + return null; + } + + public Response processMessage(Message send) throws Exception { + return null; + } + + public Response processMessageAck(MessageAck ack) throws Exception { + return null; + } + + public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { + return null; + } + + public Response processMessagePull(MessagePull pull) throws Exception { + return null; + } + + public Response processPrepareTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processProducerAck(ProducerAck ack) throws Exception { + return null; + } + + public Response processRecoverTransactions(TransactionInfo info) throws Exception { + return null; + } + + public Response processRemoveConnection(ConnectionId id) throws Exception { + return null; + } + + public Response processRemoveConsumer(ConsumerId id) throws Exception { + return null; + } + + public Response processRemoveDestination(DestinationInfo info) throws Exception { + return null; + } + + public Response processRemoveProducer(ProducerId id) throws Exception { + return null; + } + + public Response processRemoveSession(SessionId id) throws Exception { + return null; + } + + public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { + return null; + } + + public Response processRollbackTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processShutdown(ShutdownInfo info) throws Exception { + return null; + } + + public Response processWireFormat(WireFormatInfo info) throws Exception { + return null; + } + + public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { + return null; + } + + public Response processControlCommand(ControlCommand command) throws Exception { + return null; + } + + public Response processConnectionControl(ConnectionControl control) throws Exception { + return null; + } + + public Response processConnectionError(ConnectionError error) throws Exception { + return null; + } + + public Response processConsumerControl(ConsumerControl control) throws Exception { + return null; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index 97ed30a8f3..672b692635 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -19,42 +19,32 @@ package org.apache.activemq.state; import java.io.IOException; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; -import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; -import org.apache.activemq.command.FlushCommand; -import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatchNotification; -import org.apache.activemq.command.MessagePull; -import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.TransactionInfo; -import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.transport.Transport; import org.apache.activemq.util.IOExceptionSupport; -import java.util.concurrent.ConcurrentHashMap; - /** * Tracks the state of a connection so a newly established transport can * be re-initialized to the state that was tracked. * * @version $Revision$ */ -public class ConnectionStateTracker implements CommandVisitor { +public class ConnectionStateTracker extends CommandVisitorAdapter { private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); @@ -311,9 +301,6 @@ public class ConnectionStateTracker implements CommandVisitor { return TRACKED_RESPONSE_MARKER; } - public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { - return null; - } public Response processMessage(Message send) throws Exception{ if(trackTransactions&&send!=null&&send.getTransactionId()!=null){ @@ -448,43 +435,6 @@ public class ConnectionStateTracker implements CommandVisitor { return null; } - public Response processRecoverTransactions(TransactionInfo info) { - return null; - } - public Response processForgetTransaction(TransactionInfo info) throws Exception { - return null; - } - - - public Response processWireFormat(WireFormatInfo info) throws Exception { - return null; - } - public Response processKeepAlive(KeepAliveInfo info) throws Exception { - return null; - } - public Response processShutdown(ShutdownInfo info) throws Exception { - return null; - } - public Response processBrokerInfo(BrokerInfo info) throws Exception { - return null; - } - - public Response processFlush(FlushCommand command) throws Exception { - return null; - } - - public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{ - return null; - } - - public Response processMessagePull(MessagePull pull) throws Exception { - return null; - } - - public Response processProducerAck(ProducerAck ack) throws Exception { - return null; - } - public boolean isRestoreConsumers() { return restoreConsumers; } @@ -525,5 +475,4 @@ public class ConnectionStateTracker implements CommandVisitor { this.restoreTransaction = restoreTransaction; } - }