diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java index fd260420ee..f3e5fe6cf4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -55,14 +55,20 @@ public class ConnectionContext { private boolean networkConnection; private boolean faultTolerant; private final AtomicBoolean stopping = new AtomicBoolean(); - private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); + private final MessageEvaluationContext messageEvaluationContext; private boolean dontSendReponse; private boolean clientMaster = true; public ConnectionContext() { + this.messageEvaluationContext = new MessageEvaluationContext(); } - + + public ConnectionContext(MessageEvaluationContext messageEvaluationContext) { + this.messageEvaluationContext=messageEvaluationContext; + } + public ConnectionContext(ConnectionInfo info) { + this(); setClientId(info.getClientId()); setUserName(info.getUserName()); setConnectionId(info.getConnectionId()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index b3c6659587..a4812398d2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -57,6 +57,7 @@ import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; @@ -195,7 +196,7 @@ public class Queue extends BaseDestination implements Task { try { sub.add(context, this); destinationStatistics.getConsumers().increment(); - MessageEvaluationContext msgContext = new MessageEvaluationContext(); + MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); // needs to be synchronized - so no contention with dispatching synchronized (consumers) { @@ -940,7 +941,7 @@ public class Queue extends BaseDestination implements Task { protected ConnectionContext createConnectionContext() { - ConnectionContext answer = new ConnectionContext(); + ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); return answer; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index ec42d1c12b..fb415a8b64 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -200,8 +200,17 @@ public class RegionBroker implements Broker { synchronized (clientIdSet) { ConnectionContext oldContext = clientIdSet.get(clientId); if (oldContext != null) { + if (context.isFaultTolerant() || context.isNetworkConnection()){ + //remove the old connection + try{ + removeConnection(oldContext, info, new Exception("remove stale client")); + }catch(Exception e){ + LOG.warn("Failed to remove stale connection ",e); + } + }else{ throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + oldContext.getConnection().getRemoteAddress()); + } } else { clientIdSet.put(clientId, context); } @@ -673,7 +682,6 @@ public class RegionBroker implements Broker { public void sendToDeadLetterQueue(ConnectionContext context, MessageReference node){ try{ - boolean sent=false; if(node!=null){ Message message=node.getMessage(); if(message!=null&&node.getRegionDestination()!=null){ @@ -703,18 +711,10 @@ public class RegionBroker implements Broker { } BrokerSupport.resend(context,message, deadLetterDestination); - sent=true; } - }else { - //don't want to warn about failing to send - // if there isn't a dead letter strategy - sent=true; } } } - if(sent==false){ - LOG.warn("Failed to send "+node+" to DLQ"); - } }catch(Exception e){ LOG.warn("Caught an exception sending to DLQ: "+node,e); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 3cd05f420e..0cbae2a3e6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -47,6 +47,7 @@ import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; @@ -213,7 +214,7 @@ public class Topic extends BaseDestination implements Task{ topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive()); } - final MessageEvaluationContext msgContext = new MessageEvaluationContext(); + final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(destination); if (subscription.isRecoveryRequired()) { topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index ed1e06d22d..250b800074 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -28,6 +28,7 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.command.Message; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.kaha.CommandMarshaller; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.Store; @@ -393,6 +394,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple if (LOG.isDebugEnabled()) { LOG.debug("Discarding message " + message); } - broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(), message); + broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index f2eda95a59..809ba2d531 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -25,6 +25,7 @@ import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.store.TopicMessageStore; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -78,7 +79,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { } public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { - MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); + MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext(); messageEvaluationContext.setMessageReference(message); if (this.subscription.matches(message, messageEvaluationContext)) { return super.recoverMessage(message, cached); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java index 1d949ba34f..c2f87bd837 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java @@ -7,6 +7,7 @@ import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; /** * Simple dispatch policy that determines if a message can be sent to a subscription @@ -26,7 +27,7 @@ public class SimpleDispatchSelector implements DispatchSelector { } public boolean canDispatch(Subscription subscription, MessageReference node) throws Exception { - MessageEvaluationContext msgContext = new MessageEvaluationContext(); + MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(this.destination); msgContext.setMessageReference(node); return subscription.matches(node, msgContext); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java index f9992e3e07..a61a6a1bac 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java @@ -25,6 +25,7 @@ import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; /** * Represents a composite {@link Destination} where send()s are replicated to @@ -55,7 +56,7 @@ public class CompositeDestinationFilter extends DestinationFilter { if (value instanceof FilteredDestination) { FilteredDestination filteredDestination = (FilteredDestination)value; if (messageContext == null) { - messageContext = new MessageEvaluationContext(); + messageContext = new NonCachedMessageEvaluationContext(); messageContext.setMessageReference(message); } messageContext.setDestination(filteredDestination.getDestination()); diff --git a/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java b/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java index 5734d16d95..8d5f6f3559 100755 --- a/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java @@ -33,11 +33,11 @@ import org.apache.activemq.command.Message; */ public class MessageEvaluationContext { - private MessageReference messageReference; - private boolean loaded; - private boolean dropped; - private Message message; - private ActiveMQDestination destination; + protected MessageReference messageReference; + protected boolean loaded; + protected boolean dropped; + protected Message message; + protected ActiveMQDestination destination; public MessageEvaluationContext() { } diff --git a/activemq-core/src/main/java/org/apache/activemq/filter/NonCachedMessageEvaluationContext.java b/activemq-core/src/main/java/org/apache/activemq/filter/NonCachedMessageEvaluationContext.java new file mode 100644 index 0000000000..1ea1193d39 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/filter/NonCachedMessageEvaluationContext.java @@ -0,0 +1,42 @@ +package org.apache.activemq.filter; +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.io.IOException; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.Message; + +/** + * NonCached version of the MessageEvaluationContext + * + * @version $Revision: 1.4 $ + */ +public class NonCachedMessageEvaluationContext extends MessageEvaluationContext { + + + public Message getMessage() throws IOException { + return messageReference != null ? messageReference.getMessage():null; + } + + public void setMessageReference(MessageReference messageReference) { + this.messageReference = messageReference; + } + + + protected void clearMessageCache() { + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index 66b10937b9..d3d90bc127 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -38,6 +38,7 @@ import org.apache.activemq.command.JournalQueueAck; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.kaha.impl.async.Location; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; @@ -87,7 +88,7 @@ public class AMQMessageStore implements MessageStore { this.transactionStore = adapter.getTransactionStore(); this.referenceStore = referenceStore; this.destination = destination; - this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); + this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext())); asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() { public boolean iterate() { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index e00e8dc220..0d7d015d11 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -45,6 +45,7 @@ import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.async.Location; import org.apache.activemq.kaha.impl.index.hash.HashIndex; @@ -502,7 +503,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, int redoCounter = 0; LOG.info("Journal Recovery Started from: " + asyncDataManager); long start = System.currentTimeMillis(); - ConnectionContext context = new ConnectionContext(); + ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); // While we have records in the journal. while ((pos = asyncDataManager.getNextLocation(pos)) != null) { ByteSequence data = asyncDataManager.read(pos); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java index b46bdbe199..9b70ddb82f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java @@ -33,6 +33,7 @@ import org.apache.activemq.command.JournalQueueAck; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -75,7 +76,7 @@ public class JournalMessageStore implements MessageStore { this.transactionStore = adapter.getTransactionStore(); this.longTermStore = checkpointStore; this.destination = destination; - this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); + this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext())); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 6c7aaceea3..a57dd1fbab 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -49,6 +49,7 @@ import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -469,7 +470,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve int transactionCounter = 0; LOG.info("Journal Recovery Started from: " + journal); - ConnectionContext context = new ConnectionContext(); + ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); // While we have records in the journal. while ((pos = journal.getNextRecordLocation(pos)) != null) {