removing dead code on openwire implementation

This commit is contained in:
Clebert Suconic 2015-08-28 14:00:04 -04:00
parent 8d98fc395f
commit 3fbf75b2ff
6 changed files with 8 additions and 392 deletions

View File

@ -97,7 +97,6 @@ import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence;
@ -134,8 +133,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
private AMQMessageAuthorizationPolicy messageAuthorizationPolicy;
private boolean networkConnection;
private boolean manageable;
private boolean pendingStop;
@ -153,18 +150,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
private final CountDownLatch stopped = new CountDownLatch(1);
protected TaskRunner taskRunner;
private boolean active;
protected final List<Command> dispatchQueue = new LinkedList<Command>();
private boolean markedCandidate;
private boolean blockedCandidate;
private long timeStamp;
private boolean inServiceException;
private final AtomicBoolean asyncException = new AtomicBoolean(false);
@ -575,7 +564,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
// it should be related to activemq's Acceptor
context.setConnector(this.acceptorUsed);
context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
context.setNetworkConnection(networkConnection);
context.setFaultTolerant(faultTolerantConnection);
context.setTransactions(new ConcurrentHashMap<TransactionId, AMQTransaction>());
context.setUserName(info.getUserName());
@ -612,30 +600,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
public void dispatchAsync(Command message) {
if (!stopping.get()) {
if (taskRunner == null) {
dispatchSync(message);
}
else {
synchronized (dispatchQueue) {
dispatchQueue.add(message);
}
try {
taskRunner.wakeup();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
else {
if (message.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) message;
TransmitCallback sub = md.getTransmitCallback();
protocolManager.postProcessDispatch(md);
if (sub != null) {
sub.onFailure();
}
if (message.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) message;
TransmitCallback sub = md.getTransmitCallback();
protocolManager.postProcessDispatch(md);
if (sub != null) {
sub.onFailure();
}
}
}
@ -722,22 +692,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
*/
}
public void setMarkedCandidate(boolean markedCandidate) {
this.markedCandidate = markedCandidate;
if (!markedCandidate) {
timeStamp = 0;
blockedCandidate = false;
}
}
protected void dispatch(Command command) throws IOException {
try {
setMarkedCandidate(true);
this.physicalSend(command);
}
finally {
setMarkedCandidate(false);
}
this.physicalSend(command);
}
protected void processDispatch(Command command) throws IOException {
@ -854,11 +810,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
// log
}
if (taskRunner != null) {
taskRunner.shutdown(1);
taskRunner = null;
}
active = false;
// Run the MessageDispatch callbacks so that message references get
// cleaned up.

View File

@ -23,7 +23,6 @@ public abstract class AMQConsumerBrokerExchange {
protected final AMQSession amqSession;
private AMQConnectionContext connectionContext;
private AMQDestination regionDestination;
private AMQSubscription subscription;
private boolean wildcard;
@ -45,20 +44,6 @@ public abstract class AMQConsumerBrokerExchange {
this.connectionContext = connectionContext;
}
/**
* @return the regionDestination
*/
public AMQDestination getRegionDestination() {
return this.regionDestination;
}
/**
* @param regionDestination the regionDestination to set
*/
public void setRegionDestination(AMQDestination regionDestination) {
this.regionDestination = regionDestination;
}
/**
* @return the subscription
*/

View File

@ -1,241 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.io.IOException;
import java.util.List;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.Usage;
public interface AMQDestination {
AMQDeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new AMQSharedDeadLetterStrategy();
long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000;
void addSubscription(AMQConnectionContext context, AMQSubscription sub) throws Exception;
void removeSubscription(AMQConnectionContext context,
AMQSubscription sub,
long lastDeliveredSequenceId) throws Exception;
void addProducer(AMQConnectionContext context, ProducerInfo info) throws Exception;
void removeProducer(AMQConnectionContext context, ProducerInfo info) throws Exception;
void send(AMQProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
void acknowledge(AMQConnectionContext context,
AMQSubscription sub,
final MessageAck ack,
final MessageReference node) throws IOException;
long getInactiveTimoutBeforeGC();
void markForGC(long timeStamp);
boolean canGC();
void gc();
ActiveMQDestination getActiveMQDestination();
MemoryUsage getMemoryUsage();
void setMemoryUsage(MemoryUsage memoryUsage);
void dispose(AMQConnectionContext context) throws IOException;
boolean isDisposed();
AMQDestinationStatistics getDestinationStatistics();
AMQDeadLetterStrategy getDeadLetterStrategy();
Message[] browse();
String getName();
AMQMessageStore getMessageStore();
boolean isProducerFlowControl();
void setProducerFlowControl(boolean value);
boolean isAlwaysRetroactive();
void setAlwaysRetroactive(boolean value);
/**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable
* warnings
*
* @param blockedProducerWarningInterval the interval at which warning about blocked producers will be
* triggered.
*/
void setBlockedProducerWarningInterval(long blockedProducerWarningInterval);
/**
* @return the interval at which warning about blocked producers will be
* triggered.
*/
long getBlockedProducerWarningInterval();
int getMaxProducersToAudit();
void setMaxProducersToAudit(int maxProducersToAudit);
int getMaxAuditDepth();
void setMaxAuditDepth(int maxAuditDepth);
boolean isEnableAudit();
void setEnableAudit(boolean enableAudit);
boolean isActive();
int getMaxPageSize();
void setMaxPageSize(int maxPageSize);
int getMaxBrowsePageSize();
void setMaxBrowsePageSize(int maxPageSize);
boolean isUseCache();
void setUseCache(boolean useCache);
int getMinimumMessageSize();
void setMinimumMessageSize(int minimumMessageSize);
int getCursorMemoryHighWaterMark();
void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
/**
* optionally called by a Subscriber - to inform the Destination its ready
* for more messages
*/
void wakeup();
/**
* @return true if lazyDispatch is enabled
*/
boolean isLazyDispatch();
/**
* set the lazy dispatch - default is false
*
* @param value
*/
void setLazyDispatch(boolean value);
/**
* Inform the Destination a message has expired
*
* @param context
* @param subs
* @param node
*/
void messageExpired(AMQConnectionContext context, AMQSubscription subs, MessageReference node);
/**
* called when message is consumed
*
* @param context
* @param messageReference
*/
void messageConsumed(AMQConnectionContext context, MessageReference messageReference);
/**
* Called when message is delivered to the broker
*
* @param context
* @param messageReference
*/
void messageDelivered(AMQConnectionContext context, MessageReference messageReference);
/**
* Called when a message is discarded - e.g. running low on memory This will
* happen only if the policy is enabled - e.g. non durable topics
*
* @param context
* @param messageReference
* @param sub
*/
void messageDiscarded(AMQConnectionContext context, AMQSubscription sub, MessageReference messageReference);
/**
* Called when there is a slow consumer
*
* @param context
* @param subs
*/
void slowConsumer(AMQConnectionContext context, AMQSubscription subs);
/**
* Called to notify a producer is too fast
*
* @param context
* @param producerInfo
*/
void fastProducer(AMQConnectionContext context, ProducerInfo producerInfo);
/**
* Called when a Usage reaches a limit
*
* @param context
* @param usage
*/
void isFull(AMQConnectionContext context, Usage<?> usage);
List<AMQSubscription> getConsumers();
/**
* called on Queues in slave mode to allow dispatch to follow subscription
* choice of master
*
* @param messageDispatchNotification
* @throws Exception
*/
void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
boolean isPrioritizedMessages();
AMQSlowConsumerStrategy getSlowConsumerStrategy();
boolean isDoOptimzeMessageStorage();
void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
void clearPendingMessages();
boolean isDLQ();
void duplicateFromStore(Message message, AMQSubscription subscription);
}

View File

@ -26,7 +26,6 @@ import org.apache.activemq.state.ProducerState;
public class AMQProducerBrokerExchange {
private AMQConnectionContext connectionContext;
private AMQDestination regionDestination;
private ProducerState producerState;
private boolean mutable = true;
private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
@ -40,7 +39,6 @@ public class AMQProducerBrokerExchange {
public AMQProducerBrokerExchange copy() {
AMQProducerBrokerExchange rc = new AMQProducerBrokerExchange();
rc.connectionContext = connectionContext.copy();
rc.regionDestination = regionDestination;
rc.producerState = producerState;
rc.mutable = mutable;
return rc;
@ -74,20 +72,6 @@ public class AMQProducerBrokerExchange {
this.mutable = mutable;
}
/**
* @return the regionDestination
*/
public AMQDestination getRegionDestination() {
return this.regionDestination;
}
/**
* @param regionDestination the regionDestination to set
*/
public void setRegionDestination(AMQDestination regionDestination) {
this.regionDestination = regionDestination;
}
/**
* @return the producerState
*/
@ -149,10 +133,6 @@ public class AMQProducerBrokerExchange {
flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
}
public void incrementTimeBlocked(AMQDestination destination, long timeBlocked) {
flowControlInfo.incrementTimeBlocked(timeBlocked);
}
public boolean isBlockedForFlowControl() {
return flowControlInfo.isBlockingOnFlowControl();
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
public interface AMQSlowConsumerStrategy {
/**
* Slow consumer event.
*
* @param context Connection context of the subscription.
* @param subs The subscription object for the slow consumer.
*/
void slowConsumer(AMQConnectionContext context, AMQSubscription subs);
/**
* For Strategies that need to examine assigned destination for slow consumers
* periodically the destination is assigned here.
*
* If the strategy doesn't is event driven it can just ignore assigned destination.
*
* @param destination A destination to add to a watch list.
*/
void addDestination(AMQDestination destination);
}

View File

@ -16,11 +16,9 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.io.IOException;
import java.util.List;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import java.io.IOException;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.ActiveMQDestination;
@ -81,24 +79,6 @@ public interface AMQSubscription extends AMQSubscriptionRecovery {
*/
boolean matches(ActiveMQDestination destination);
/**
* The subscription will be receiving messages from the destination.
*
* @param context
* @param destination
* @throws Exception
*/
void add(AMQConnectionContext context, AMQDestination destination) throws Exception;
/**
* The subscription will be no longer be receiving messages from the destination.
*
* @param context
* @param destination
* @return a list of un-acked messages that were added to the subscription.
*/
List<MessageReference> remove(AMQConnectionContext context, AMQDestination destination) throws Exception;
/**
* The ConsumerInfo object that created the subscription.
*/