Added NonCachedMessageContext

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@633800 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-05 09:35:31 +00:00
parent 26aeb46493
commit 3b0afd6b71
14 changed files with 85 additions and 27 deletions

View File

@ -55,14 +55,20 @@ public class ConnectionContext {
private boolean networkConnection; private boolean networkConnection;
private boolean faultTolerant; private boolean faultTolerant;
private final AtomicBoolean stopping = new AtomicBoolean(); private final AtomicBoolean stopping = new AtomicBoolean();
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); private final MessageEvaluationContext messageEvaluationContext;
private boolean dontSendReponse; private boolean dontSendReponse;
private boolean clientMaster = true; private boolean clientMaster = true;
public ConnectionContext() { public ConnectionContext() {
this.messageEvaluationContext = new MessageEvaluationContext();
} }
public ConnectionContext(MessageEvaluationContext messageEvaluationContext) {
this.messageEvaluationContext=messageEvaluationContext;
}
public ConnectionContext(ConnectionInfo info) { public ConnectionContext(ConnectionInfo info) {
this();
setClientId(info.getClientId()); setClientId(info.getClientId());
setUserName(info.getUserName()); setUserName(info.getUserName());
setConnectionId(info.getConnectionId()); setConnectionId(info.getConnectionId());

View File

@ -57,6 +57,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
@ -195,7 +196,7 @@ public class Queue extends BaseDestination implements Task {
try { try {
sub.add(context, this); sub.add(context, this);
destinationStatistics.getConsumers().increment(); destinationStatistics.getConsumers().increment();
MessageEvaluationContext msgContext = new MessageEvaluationContext(); MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
// needs to be synchronized - so no contention with dispatching // needs to be synchronized - so no contention with dispatching
synchronized (consumers) { synchronized (consumers) {
@ -940,7 +941,7 @@ public class Queue extends BaseDestination implements Task {
protected ConnectionContext createConnectionContext() { protected ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext(); ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
return answer; return answer;
} }

View File

@ -200,8 +200,17 @@ public class RegionBroker implements Broker {
synchronized (clientIdSet) { synchronized (clientIdSet) {
ConnectionContext oldContext = clientIdSet.get(clientId); ConnectionContext oldContext = clientIdSet.get(clientId);
if (oldContext != null) { if (oldContext != null) {
if (context.isFaultTolerant() || context.isNetworkConnection()){
//remove the old connection
try{
removeConnection(oldContext, info, new Exception("remove stale client"));
}catch(Exception e){
LOG.warn("Failed to remove stale connection ",e);
}
}else{
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
+ oldContext.getConnection().getRemoteAddress()); + oldContext.getConnection().getRemoteAddress());
}
} else { } else {
clientIdSet.put(clientId, context); clientIdSet.put(clientId, context);
} }
@ -673,7 +682,6 @@ public class RegionBroker implements Broker {
public void sendToDeadLetterQueue(ConnectionContext context, public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference node){ MessageReference node){
try{ try{
boolean sent=false;
if(node!=null){ if(node!=null){
Message message=node.getMessage(); Message message=node.getMessage();
if(message!=null&&node.getRegionDestination()!=null){ if(message!=null&&node.getRegionDestination()!=null){
@ -703,18 +711,10 @@ public class RegionBroker implements Broker {
} }
BrokerSupport.resend(context,message, BrokerSupport.resend(context,message,
deadLetterDestination); deadLetterDestination);
sent=true;
} }
}else {
//don't want to warn about failing to send
// if there isn't a dead letter strategy
sent=true;
} }
} }
} }
if(sent==false){
LOG.warn("Failed to send "+node+" to DLQ");
}
}catch(Exception e){ }catch(Exception e){
LOG.warn("Caught an exception sending to DLQ: "+node,e); LOG.warn("Caught an exception sending to DLQ: "+node,e);
} }

View File

@ -47,6 +47,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
@ -213,7 +214,7 @@ public class Topic extends BaseDestination implements Task{
topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive()); topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
} }
final MessageEvaluationContext msgContext = new MessageEvaluationContext(); final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination); msgContext.setDestination(destination);
if (subscription.isRecoveryRequired()) { if (subscription.isRecoveryRequired()) {
topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {

View File

@ -28,6 +28,7 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.CommandMarshaller; import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
@ -393,6 +394,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Discarding message " + message); LOG.debug("Discarding message " + message);
} }
broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(), message); broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message);
} }
} }

View File

@ -25,6 +25,7 @@ import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -78,7 +79,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
} }
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
messageEvaluationContext.setMessageReference(message); messageEvaluationContext.setMessageReference(message);
if (this.subscription.matches(message, messageEvaluationContext)) { if (this.subscription.matches(message, messageEvaluationContext)) {
return super.recoverMessage(message, cached); return super.recoverMessage(message, cached);

View File

@ -7,6 +7,7 @@ import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
/** /**
* Simple dispatch policy that determines if a message can be sent to a subscription * Simple dispatch policy that determines if a message can be sent to a subscription
@ -26,7 +27,7 @@ public class SimpleDispatchSelector implements DispatchSelector {
} }
public boolean canDispatch(Subscription subscription, MessageReference node) throws Exception { public boolean canDispatch(Subscription subscription, MessageReference node) throws Exception {
MessageEvaluationContext msgContext = new MessageEvaluationContext(); MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(this.destination); msgContext.setDestination(this.destination);
msgContext.setMessageReference(node); msgContext.setMessageReference(node);
return subscription.matches(node, msgContext); return subscription.matches(node, msgContext);

View File

@ -25,6 +25,7 @@ import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
/** /**
* Represents a composite {@link Destination} where send()s are replicated to * Represents a composite {@link Destination} where send()s are replicated to
@ -55,7 +56,7 @@ public class CompositeDestinationFilter extends DestinationFilter {
if (value instanceof FilteredDestination) { if (value instanceof FilteredDestination) {
FilteredDestination filteredDestination = (FilteredDestination)value; FilteredDestination filteredDestination = (FilteredDestination)value;
if (messageContext == null) { if (messageContext == null) {
messageContext = new MessageEvaluationContext(); messageContext = new NonCachedMessageEvaluationContext();
messageContext.setMessageReference(message); messageContext.setMessageReference(message);
} }
messageContext.setDestination(filteredDestination.getDestination()); messageContext.setDestination(filteredDestination.getDestination());

View File

@ -33,11 +33,11 @@ import org.apache.activemq.command.Message;
*/ */
public class MessageEvaluationContext { public class MessageEvaluationContext {
private MessageReference messageReference; protected MessageReference messageReference;
private boolean loaded; protected boolean loaded;
private boolean dropped; protected boolean dropped;
private Message message; protected Message message;
private ActiveMQDestination destination; protected ActiveMQDestination destination;
public MessageEvaluationContext() { public MessageEvaluationContext() {
} }

View File

@ -0,0 +1,42 @@
package org.apache.activemq.filter;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
/**
* NonCached version of the MessageEvaluationContext
*
* @version $Revision: 1.4 $
*/
public class NonCachedMessageEvaluationContext extends MessageEvaluationContext {
public Message getMessage() throws IOException {
return messageReference != null ? messageReference.getMessage():null;
}
public void setMessageReference(MessageReference messageReference) {
this.messageReference = messageReference;
}
protected void clearMessageCache() {
}
}

View File

@ -38,6 +38,7 @@ import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.Location; import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
@ -87,7 +88,7 @@ public class AMQMessageStore implements MessageStore {
this.transactionStore = adapter.getTransactionStore(); this.transactionStore = adapter.getTransactionStore();
this.referenceStore = referenceStore; this.referenceStore = referenceStore;
this.destination = destination; this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() { asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
public boolean iterate() { public boolean iterate() {

View File

@ -45,6 +45,7 @@ import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location; import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.kaha.impl.index.hash.HashIndex; import org.apache.activemq.kaha.impl.index.hash.HashIndex;
@ -502,7 +503,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
int redoCounter = 0; int redoCounter = 0;
LOG.info("Journal Recovery Started from: " + asyncDataManager); LOG.info("Journal Recovery Started from: " + asyncDataManager);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
ConnectionContext context = new ConnectionContext(); ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
// While we have records in the journal. // While we have records in the journal.
while ((pos = asyncDataManager.getNextLocation(pos)) != null) { while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
ByteSequence data = asyncDataManager.read(pos); ByteSequence data = asyncDataManager.read(pos);

View File

@ -33,6 +33,7 @@ import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -75,7 +76,7 @@ public class JournalMessageStore implements MessageStore {
this.transactionStore = adapter.getTransactionStore(); this.transactionStore = adapter.getTransactionStore();
this.longTermStore = checkpointStore; this.longTermStore = checkpointStore;
this.destination = destination; this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
} }

View File

@ -49,6 +49,7 @@ import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -469,7 +470,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
int transactionCounter = 0; int transactionCounter = 0;
LOG.info("Journal Recovery Started from: " + journal); LOG.info("Journal Recovery Started from: " + journal);
ConnectionContext context = new ConnectionContext(); ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
// While we have records in the journal. // While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) { while ((pos = journal.getNextRecordLocation(pos)) != null) {