From 3fbf75b2ff79cd6d3e4847c1bbfe85956689375d Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 28 Aug 2015 14:00:04 -0400 Subject: [PATCH] removing dead code on openwire implementation --- .../protocol/openwire/OpenWireConnection.java | 63 +---- .../amq/AMQConsumerBrokerExchange.java | 15 -- .../protocol/openwire/amq/AMQDestination.java | 241 ------------------ .../amq/AMQProducerBrokerExchange.java | 20 -- .../openwire/amq/AMQSlowConsumerStrategy.java | 39 --- .../openwire/amq/AMQSubscription.java | 22 +- 6 files changed, 8 insertions(+), 392 deletions(-) delete mode 100644 artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java delete mode 100644 artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index ffe2b38241..35861a93ff 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -97,7 +97,6 @@ import org.apache.activemq.state.ConnectionState; import org.apache.activemq.state.ConsumerState; import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.SessionState; -import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.util.ByteSequence; @@ -134,8 +133,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S private AMQMessageAuthorizationPolicy messageAuthorizationPolicy; - private boolean networkConnection; - private boolean manageable; private boolean pendingStop; @@ -153,18 +150,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S private final CountDownLatch stopped = new CountDownLatch(1); - protected TaskRunner taskRunner; - private boolean active; protected final List dispatchQueue = new LinkedList(); - private boolean markedCandidate; - - private boolean blockedCandidate; - - private long timeStamp; - private boolean inServiceException; private final AtomicBoolean asyncException = new AtomicBoolean(false); @@ -575,7 +564,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S // it should be related to activemq's Acceptor context.setConnector(this.acceptorUsed); context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); - context.setNetworkConnection(networkConnection); context.setFaultTolerant(faultTolerantConnection); context.setTransactions(new ConcurrentHashMap()); context.setUserName(info.getUserName()); @@ -612,30 +600,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S } public void dispatchAsync(Command message) { - if (!stopping.get()) { - if (taskRunner == null) { - dispatchSync(message); - } - else { - synchronized (dispatchQueue) { - dispatchQueue.add(message); - } - try { - taskRunner.wakeup(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - else { - if (message.isMessageDispatch()) { - MessageDispatch md = (MessageDispatch) message; - TransmitCallback sub = md.getTransmitCallback(); - protocolManager.postProcessDispatch(md); - if (sub != null) { - sub.onFailure(); - } + if (message.isMessageDispatch()) { + MessageDispatch md = (MessageDispatch) message; + TransmitCallback sub = md.getTransmitCallback(); + protocolManager.postProcessDispatch(md); + if (sub != null) { + sub.onFailure(); } } } @@ -722,22 +692,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S */ } - public void setMarkedCandidate(boolean markedCandidate) { - this.markedCandidate = markedCandidate; - if (!markedCandidate) { - timeStamp = 0; - blockedCandidate = false; - } - } - protected void dispatch(Command command) throws IOException { - try { - setMarkedCandidate(true); - this.physicalSend(command); - } - finally { - setMarkedCandidate(false); - } + this.physicalSend(command); } protected void processDispatch(Command command) throws IOException { @@ -854,11 +810,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S // log } - if (taskRunner != null) { - taskRunner.shutdown(1); - taskRunner = null; - } - active = false; // Run the MessageDispatch callbacks so that message references get // cleaned up. diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java index 1e87db335e..d92cd1579a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java @@ -23,7 +23,6 @@ public abstract class AMQConsumerBrokerExchange { protected final AMQSession amqSession; private AMQConnectionContext connectionContext; - private AMQDestination regionDestination; private AMQSubscription subscription; private boolean wildcard; @@ -45,20 +44,6 @@ public abstract class AMQConsumerBrokerExchange { this.connectionContext = connectionContext; } - /** - * @return the regionDestination - */ - public AMQDestination getRegionDestination() { - return this.regionDestination; - } - - /** - * @param regionDestination the regionDestination to set - */ - public void setRegionDestination(AMQDestination regionDestination) { - this.regionDestination = regionDestination; - } - /** * @return the subscription */ diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java deleted file mode 100644 index ca061034ae..0000000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import java.io.IOException; -import java.util.List; - -import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatchNotification; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.Usage; - -public interface AMQDestination { - - AMQDeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new AMQSharedDeadLetterStrategy(); - long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000; - - void addSubscription(AMQConnectionContext context, AMQSubscription sub) throws Exception; - - void removeSubscription(AMQConnectionContext context, - AMQSubscription sub, - long lastDeliveredSequenceId) throws Exception; - - void addProducer(AMQConnectionContext context, ProducerInfo info) throws Exception; - - void removeProducer(AMQConnectionContext context, ProducerInfo info) throws Exception; - - void send(AMQProducerBrokerExchange producerExchange, Message messageSend) throws Exception; - - void acknowledge(AMQConnectionContext context, - AMQSubscription sub, - final MessageAck ack, - final MessageReference node) throws IOException; - - long getInactiveTimoutBeforeGC(); - - void markForGC(long timeStamp); - - boolean canGC(); - - void gc(); - - ActiveMQDestination getActiveMQDestination(); - - MemoryUsage getMemoryUsage(); - - void setMemoryUsage(MemoryUsage memoryUsage); - - void dispose(AMQConnectionContext context) throws IOException; - - boolean isDisposed(); - - AMQDestinationStatistics getDestinationStatistics(); - - AMQDeadLetterStrategy getDeadLetterStrategy(); - - Message[] browse(); - - String getName(); - - AMQMessageStore getMessageStore(); - - boolean isProducerFlowControl(); - - void setProducerFlowControl(boolean value); - - boolean isAlwaysRetroactive(); - - void setAlwaysRetroactive(boolean value); - - /** - * Set's the interval at which warnings about producers being blocked by - * resource usage will be triggered. Values of 0 or less will disable - * warnings - * - * @param blockedProducerWarningInterval the interval at which warning about blocked producers will be - * triggered. - */ - void setBlockedProducerWarningInterval(long blockedProducerWarningInterval); - - /** - * @return the interval at which warning about blocked producers will be - * triggered. - */ - long getBlockedProducerWarningInterval(); - - int getMaxProducersToAudit(); - - void setMaxProducersToAudit(int maxProducersToAudit); - - int getMaxAuditDepth(); - - void setMaxAuditDepth(int maxAuditDepth); - - boolean isEnableAudit(); - - void setEnableAudit(boolean enableAudit); - - boolean isActive(); - - int getMaxPageSize(); - - void setMaxPageSize(int maxPageSize); - - int getMaxBrowsePageSize(); - - void setMaxBrowsePageSize(int maxPageSize); - - boolean isUseCache(); - - void setUseCache(boolean useCache); - - int getMinimumMessageSize(); - - void setMinimumMessageSize(int minimumMessageSize); - - int getCursorMemoryHighWaterMark(); - - void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); - - /** - * optionally called by a Subscriber - to inform the Destination its ready - * for more messages - */ - void wakeup(); - - /** - * @return true if lazyDispatch is enabled - */ - boolean isLazyDispatch(); - - /** - * set the lazy dispatch - default is false - * - * @param value - */ - void setLazyDispatch(boolean value); - - /** - * Inform the Destination a message has expired - * - * @param context - * @param subs - * @param node - */ - void messageExpired(AMQConnectionContext context, AMQSubscription subs, MessageReference node); - - /** - * called when message is consumed - * - * @param context - * @param messageReference - */ - void messageConsumed(AMQConnectionContext context, MessageReference messageReference); - - /** - * Called when message is delivered to the broker - * - * @param context - * @param messageReference - */ - void messageDelivered(AMQConnectionContext context, MessageReference messageReference); - - /** - * Called when a message is discarded - e.g. running low on memory This will - * happen only if the policy is enabled - e.g. non durable topics - * - * @param context - * @param messageReference - * @param sub - */ - void messageDiscarded(AMQConnectionContext context, AMQSubscription sub, MessageReference messageReference); - - /** - * Called when there is a slow consumer - * - * @param context - * @param subs - */ - void slowConsumer(AMQConnectionContext context, AMQSubscription subs); - - /** - * Called to notify a producer is too fast - * - * @param context - * @param producerInfo - */ - void fastProducer(AMQConnectionContext context, ProducerInfo producerInfo); - - /** - * Called when a Usage reaches a limit - * - * @param context - * @param usage - */ - void isFull(AMQConnectionContext context, Usage usage); - - List getConsumers(); - - /** - * called on Queues in slave mode to allow dispatch to follow subscription - * choice of master - * - * @param messageDispatchNotification - * @throws Exception - */ - void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception; - - boolean isPrioritizedMessages(); - - AMQSlowConsumerStrategy getSlowConsumerStrategy(); - - boolean isDoOptimzeMessageStorage(); - - void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage); - - void clearPendingMessages(); - - boolean isDLQ(); - - void duplicateFromStore(Message message, AMQSubscription subscription); - -} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java index d7648cc1c3..f94c119c4c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java @@ -26,7 +26,6 @@ import org.apache.activemq.state.ProducerState; public class AMQProducerBrokerExchange { private AMQConnectionContext connectionContext; - private AMQDestination regionDestination; private ProducerState producerState; private boolean mutable = true; private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); @@ -40,7 +39,6 @@ public class AMQProducerBrokerExchange { public AMQProducerBrokerExchange copy() { AMQProducerBrokerExchange rc = new AMQProducerBrokerExchange(); rc.connectionContext = connectionContext.copy(); - rc.regionDestination = regionDestination; rc.producerState = producerState; rc.mutable = mutable; return rc; @@ -74,20 +72,6 @@ public class AMQProducerBrokerExchange { this.mutable = mutable; } - /** - * @return the regionDestination - */ - public AMQDestination getRegionDestination() { - return this.regionDestination; - } - - /** - * @param regionDestination the regionDestination to set - */ - public void setRegionDestination(AMQDestination regionDestination) { - this.regionDestination = regionDestination; - } - /** * @return the producerState */ @@ -149,10 +133,6 @@ public class AMQProducerBrokerExchange { flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl); } - public void incrementTimeBlocked(AMQDestination destination, long timeBlocked) { - flowControlInfo.incrementTimeBlocked(timeBlocked); - } - public boolean isBlockedForFlowControl() { return flowControlInfo.isBlockingOnFlowControl(); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java deleted file mode 100644 index 333dd0408d..0000000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -public interface AMQSlowConsumerStrategy { - - /** - * Slow consumer event. - * - * @param context Connection context of the subscription. - * @param subs The subscription object for the slow consumer. - */ - void slowConsumer(AMQConnectionContext context, AMQSubscription subs); - - /** - * For Strategies that need to examine assigned destination for slow consumers - * periodically the destination is assigned here. - * - * If the strategy doesn't is event driven it can just ignore assigned destination. - * - * @param destination A destination to add to a watch list. - */ - void addDestination(AMQDestination destination); - -} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java index a23d675a6d..abf492d943 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java @@ -16,11 +16,9 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; -import java.io.IOException; -import java.util.List; - import javax.jms.InvalidSelectorException; import javax.management.ObjectName; +import java.io.IOException; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.ActiveMQDestination; @@ -81,24 +79,6 @@ public interface AMQSubscription extends AMQSubscriptionRecovery { */ boolean matches(ActiveMQDestination destination); - /** - * The subscription will be receiving messages from the destination. - * - * @param context - * @param destination - * @throws Exception - */ - void add(AMQConnectionContext context, AMQDestination destination) throws Exception; - - /** - * The subscription will be no longer be receiving messages from the destination. - * - * @param context - * @param destination - * @return a list of un-acked messages that were added to the subscription. - */ - List remove(AMQConnectionContext context, AMQDestination destination) throws Exception; - /** * The ConsumerInfo object that created the subscription. */