Define specific region classes for TempQueue and TempTopic

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@633976 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-05 18:59:58 +00:00
parent c1e560e7ef
commit cd54a162f6
7 changed files with 268 additions and 196 deletions

View File

@ -77,18 +77,9 @@ public class DestinationFactoryImpl extends DestinationFactory {
if (destination.isQueue()) { if (destination.isQueue()) {
if (destination.isTemporary()) { if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) { Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
queue.initialize();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { return queue;
// Only consumers on the same connection can consume
// from
// the temporary destination
if (!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())) {
throw new JMSException("Cannot subscribe to remote temporary destination: " + tempDest);
}
super.addSubscription(context, sub);
};
};
} else { } else {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination); MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory); Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
@ -97,18 +88,10 @@ public class DestinationFactoryImpl extends DestinationFactory {
return queue; return queue;
} }
} else if (destination.isTemporary()) { } else if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
// Only consumers on the same connection can consume from topic.initialize();
// the temporary destination return topic;
if (!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())) {
throw new JMSException("Cannot subscribe to remote temporary destination: " + tempDest);
}
super.addSubscription(context, sub);
};
};
} else { } else {
TopicMessageStore store = null; TopicMessageStore store = null;
if (!AdvisorySupport.isAdvisoryTopic(destination)) { if (!AdvisorySupport.isAdvisoryTopic(destination)) {

View File

@ -16,12 +16,9 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
/** /**
* Keeps track of a message that is flowing through the Broker. This object may * Keeps track of a message that is flowing through the Broker. This object may
@ -32,21 +29,6 @@ import org.apache.activemq.store.MessageStore;
*/ */
public class IndirectMessageReference implements QueueMessageReference { public class IndirectMessageReference implements QueueMessageReference {
/** The destination that is managing the message */
private final Destination regionDestination;
private final MessageStore destinationStore;
/** The id of the message is always valid */
private final MessageId messageId;
/** Is the message persistent? */
private final boolean persistent;
private final String groupID;
private final int groupSequence;
private final ConsumerId targetConsumerId;
/** The number of times the message has been delivered. */
private short redeliveryCounter;
/** The subscription that has locked the message */ /** The subscription that has locked the message */
private LockOwner lockOwner; private LockOwner lockOwner;
/** Has the message been dropped? */ /** Has the message been dropped? */
@ -54,76 +36,44 @@ public class IndirectMessageReference implements QueueMessageReference {
/** Has the message been acked? */ /** Has the message been acked? */
private boolean acked; private boolean acked;
/** Direct reference to the message */ /** Direct reference to the message */
private Message message; private final Message message;
/** The number of times the message has requested being hardened */
private int referenceCount;
/** the size of the message * */
private int cachedSize;
/** the expiration time of the message */
private long expiration;
public IndirectMessageReference(Queue destination, MessageStore destinationStore, Message message) { /**
this.regionDestination = destination; * @param message
this.destinationStore = destinationStore; */
public IndirectMessageReference(final Message message) {
this.message = message; this.message = message;
this.messageId = message.getMessageId(); message.getMessageId();
this.persistent = message.isPersistent() && destination.getMessageStore() != null; message.getGroupID();
this.groupID = message.getGroupID(); message.getGroupSequence();
this.groupSequence = message.getGroupSequence();
this.targetConsumerId = message.getTargetConsumerId();
this.expiration = message.getExpiration();
this.referenceCount = 1;
message.incrementReferenceCount();
this.cachedSize = message.getSize();
} }
public synchronized Message getMessageHardRef() { public Message getMessageHardRef() {
return message; return message;
} }
public synchronized int getReferenceCount() { public int getReferenceCount() {
return referenceCount; return message.getReferenceCount();
} }
public synchronized int incrementReferenceCount() { public int incrementReferenceCount() {
int rc = ++referenceCount; return message.incrementReferenceCount();
if (persistent && rc == 1 && message == null) {
try {
message = destinationStore.getMessage(messageId);
if (message == null) {
dropped = true;
} else {
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return rc;
} }
public synchronized int decrementReferenceCount() { public int decrementReferenceCount() {
int rc = --referenceCount; return message.decrementReferenceCount();
if (persistent && rc == 0 && message != null) {
message.decrementReferenceCount();
// message=null;
}
return rc;
} }
public synchronized Message getMessage() { public Message getMessage() {
return message; return message;
} }
public String toString() { public String toString() {
return "Message " + messageId + " dropped=" + dropped + " locked=" + (lockOwner != null); return "Message " + message.getMessageId() + " dropped=" + dropped + " locked=" + (lockOwner != null);
} }
public synchronized void incrementRedeliveryCounter() { public void incrementRedeliveryCounter() {
this.redeliveryCounter++; message.incrementRedeliveryCounter();
} }
public synchronized boolean isDropped() { public synchronized boolean isDropped() {
@ -133,10 +83,7 @@ public class IndirectMessageReference implements QueueMessageReference {
public synchronized void drop() { public synchronized void drop() {
dropped = true; dropped = true;
lockOwner = null; lockOwner = null;
if (!persistent && message != null) {
message.decrementReferenceCount(); message.decrementReferenceCount();
message = null;
}
} }
public boolean lock(LockOwner subscription) { public boolean lock(LockOwner subscription) {
@ -159,20 +106,20 @@ public class IndirectMessageReference implements QueueMessageReference {
return lockOwner; return lockOwner;
} }
public synchronized int getRedeliveryCounter() { public int getRedeliveryCounter() {
return redeliveryCounter; return message.getRedeliveryCounter();
} }
public MessageId getMessageId() { public MessageId getMessageId() {
return messageId; return message.getMessageId();
} }
public Destination getRegionDestination() { public Destination getRegionDestination() {
return regionDestination; return message.getRegionDestination();
} }
public boolean isPersistent() { public boolean isPersistent() {
return persistent; return message.isPersistent();
} }
public synchronized boolean isLocked() { public synchronized boolean isLocked() {
@ -188,34 +135,26 @@ public class IndirectMessageReference implements QueueMessageReference {
} }
public String getGroupID() { public String getGroupID() {
return groupID; return message.getGroupID();
} }
public int getGroupSequence() { public int getGroupSequence() {
return groupSequence; return message.getGroupSequence();
} }
public ConsumerId getTargetConsumerId() { public ConsumerId getTargetConsumerId() {
return targetConsumerId; return message.getTargetConsumerId();
} }
public long getExpiration() { public long getExpiration() {
return expiration; return message.getExpiration();
} }
public boolean isExpired() { public boolean isExpired() {
long expireTime = getExpiration(); return message.isExpired();
if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
return true;
}
return false;
} }
public synchronized int getSize() { public synchronized int getSize() {
Message msg = message; return message.getSize();
if (msg != null) {
return msg.getSize();
}
return cachedSize;
} }
} }

View File

@ -77,18 +77,18 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.28 $ * @version $Revision: 1.28 $
*/ */
public class Queue extends BaseDestination implements Task { public class Queue extends BaseDestination implements Task {
private final Log log; protected final Log log;
private final List<Subscription> consumers = new ArrayList<Subscription>(50); protected TaskRunner taskRunner;
private PendingMessageCursor messages; protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
private final LinkedHashMap<MessageId,MessageReference> pagedInMessages = new LinkedHashMap<MessageId,MessageReference>(); protected PendingMessageCursor messages;
private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Object sendLock = new Object(); private final Object sendLock = new Object();
private final ExecutorService executor; private ExecutorService executor;
private final TaskRunner taskRunner; protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final ReentrantLock dispatchLock = new ReentrantLock(); private final ReentrantLock dispatchLock = new ReentrantLock();
private boolean useConsumerPriority=true; private boolean useConsumerPriority=true;
private boolean strictOrderDispatch=false; private boolean strictOrderDispatch=false;
@ -110,11 +110,29 @@ public class Queue extends BaseDestination implements Task {
TaskRunnerFactory taskFactory) throws Exception { TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats); super(brokerService, store, destination, parentStats);
if (destination.isTemporary() || broker == null || store==null ) {
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
this.dispatchSelector=new QueueDispatchSelector(destination);
}
public void initialize() throws Exception {
if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) {
this.messages = new VMPendingMessageCursor(); this.messages = new VMPendingMessageCursor();
} else { } else {
this.messages = new StoreQueueCursor(broker,this); this.messages = new StoreQueueCursor(broker, this);
} }
}
// If a VMPendingMessageCursor don't use the default Producer System Usage
// since it turns into a shared blocking queue which can lead to a network deadlock.
// If we are ccursoring to disk..it's not and issue because it does not block due
// to large disk sizes.
if( messages instanceof VMPendingMessageCursor ) {
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());
}
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
@ -126,20 +144,6 @@ public class Queue extends BaseDestination implements Task {
}); });
this.taskRunner = new DeterministicTaskRunner(this.executor,this); this.taskRunner = new DeterministicTaskRunner(this.executor,this);
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
this.dispatchSelector=new QueueDispatchSelector(destination);
}
public void initialize() throws Exception {
// If a VMPendingMessageCursor don't use the default Producer System Usage
// since it turns into a shared blocking queue which can lead to a network deadlock.
// If we are ccursoring to disk..it's not and issue because it does not block due
// to large disk sizes.
if( messages instanceof VMPendingMessageCursor ) {
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());
}
super.initialize(); super.initialize();
if (store != null) { if (store != null) {
// Restore the persistent messages. // Restore the persistent messages.
@ -222,10 +226,7 @@ public class Queue extends BaseDestination implements Task {
// Add all the matching messages in the queue to the // Add all the matching messages in the queue to the
// subscription. // subscription.
for (Iterator<MessageReference> i = pagedInMessages.values() for (QueueMessageReference node:pagedInMessages.values()){
.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i
.next();
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) { if (!node.isDropped() && !node.isAcked() && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node); msgContext.setMessageReference(node);
if (sub.matches(node, msgContext)) { if (sub.matches(node, msgContext)) {
@ -274,11 +275,8 @@ public class Queue extends BaseDestination implements Task {
// redeliver inflight messages // redeliver inflight messages
sub.remove(context, this); sub.remove(context, this);
List<MessageReference> list = new ArrayList<MessageReference>(); List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
for (Iterator<MessageReference> i = pagedInMessages.values() for (QueueMessageReference node:pagedInMessages.values()){
.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i
.next();
if (!node.isDropped() && !node.isAcked() if (!node.isDropped() && !node.isAcked()
&& node.getLockOwner() == sub) { && node.getLockOwner() == sub) {
if (node.unlock()) { if (node.unlock()) {
@ -583,9 +581,8 @@ public class Queue extends BaseDestination implements Task {
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
private MessageReference createMessageReference(Message message) { private QueueMessageReference createMessageReference(Message message) {
MessageReference result = new IndirectMessageReference(this, store, message); QueueMessageReference result = new IndirectMessageReference(message);
result.decrementReferenceCount();
return result; return result;
} }
@ -597,18 +594,17 @@ public class Queue extends BaseDestination implements Task {
log.error("caught an exception browsing " + this, e); log.error("caught an exception browsing " + this, e);
} }
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i.hasNext();) { for (QueueMessageReference node:pagedInMessages.values()){
MessageReference r = i.next(); node.incrementReferenceCount();
r.incrementReferenceCount();
try { try {
Message m = r.getMessage(); Message m = node.getMessage();
if (m != null) { if (m != null) {
l.add(m); l.add(m);
} }
} catch (IOException e) { } catch (IOException e) {
log.error("caught an exception browsing " + this, e); log.error("caught an exception browsing " + this, e);
} finally { } finally {
r.decrementReferenceCount(); node.decrementReferenceCount();
} }
} }
} }
@ -886,7 +882,6 @@ public class Queue extends BaseDestination implements Task {
log.error("Failed to page in more queue messages ", e); log.error("Failed to page in more queue messages ", e);
} }
} }
synchronized(messagesWaitingForSpace) { synchronized(messagesWaitingForSpace) {
while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) { while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
Runnable op = messagesWaitingForSpace.removeFirst(); Runnable op = messagesWaitingForSpace.removeFirst();
@ -921,7 +916,7 @@ public class Queue extends BaseDestination implements Task {
}; };
} }
protected void removeMessage(ConnectionContext c, IndirectMessageReference r) throws IOException { protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
MessageAck ack = new MessageAck(); MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination); ack.setDestination(destination);
@ -955,7 +950,7 @@ public class Queue extends BaseDestination implements Task {
wakeup(); wakeup();
} }
final void wakeup() { protected void wakeup() {
try { try {
taskRunner.wakeup(); taskRunner.wakeup();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -963,8 +958,8 @@ public class Queue extends BaseDestination implements Task {
} }
} }
private List<MessageReference> doPageIn(boolean force) throws Exception { private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
List<MessageReference> result = null; List<QueueMessageReference> result = null;
dispatchLock.lock(); dispatchLock.lock();
try{ try{
@ -972,7 +967,7 @@ public class Queue extends BaseDestination implements Task {
if ((force || !consumers.isEmpty()) && toPageIn > 0) { if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn); messages.setMaxBatchSize(toPageIn);
int count = 0; int count = 0;
result = new ArrayList<MessageReference>(toPageIn); result = new ArrayList<QueueMessageReference>(toPageIn);
synchronized (messages) { synchronized (messages) {
try { try {
messages.reset(); messages.reset();
@ -980,8 +975,8 @@ public class Queue extends BaseDestination implements Task {
MessageReference node = messages.next(); MessageReference node = messages.next();
messages.remove(); messages.remove();
if (!broker.isExpired(node)) { if (!broker.isExpired(node)) {
node = createMessageReference(node.getMessage()); QueueMessageReference ref = createMessageReference(node.getMessage());
result.add(node); result.add(ref);
count++; count++;
} else { } else {
broker.messageExpired(createConnectionContext(), broker.messageExpired(createConnectionContext(),
@ -994,7 +989,7 @@ public class Queue extends BaseDestination implements Task {
} }
} }
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
for(MessageReference ref:result) { for(QueueMessageReference ref:result) {
pagedInMessages.put(ref.getMessageId(), ref); pagedInMessages.put(ref.getMessageId(), ref);
} }
} }
@ -1005,7 +1000,7 @@ public class Queue extends BaseDestination implements Task {
return result; return result;
} }
private void doDispatch(List<MessageReference> list) throws Exception { private void doDispatch(List<QueueMessageReference> list) throws Exception {
if (list != null) { if (list != null) {
synchronized (consumers) { synchronized (consumers) {
for (MessageReference node : list) { for (MessageReference node : list) {
@ -1053,7 +1048,7 @@ public class Queue extends BaseDestination implements Task {
pageInMessages(true); pageInMessages(true);
} }
private void pageInMessages(boolean force) throws Exception { protected void pageInMessages(boolean force) throws Exception {
doDispatch(doPageIn(force)); doDispatch(doPageIn(force));
} }

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
* subscriptions.
*
* @version $Revision: 1.28 $
*/
public class TempQueue extends Queue{
private final ActiveMQTempDestination tempDest;
private TaskRunnerFactory taskFactory;
/**
* @param brokerService
* @param destination
* @param store
* @param parentStats
* @param taskFactory
* @throws Exception
*/
public TempQueue(BrokerService brokerService,
ActiveMQDestination destination, MessageStore store,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
throws Exception {
super(brokerService, destination, store, parentStats, taskFactory);
this.tempDest = (ActiveMQTempDestination) destination;
this.taskFactory=taskFactory;
}
public void initialize() throws Exception {
this.messages=new VMPendingMessageCursor();
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName());
}
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
// However, we could have failed over - and we do this
// check client side anyways ....
if (!context.isFaultTolerant()
&& (!context.isNetworkConnection() && !tempDest
.getConnectionId().equals(
sub.getConsumerInfo().getConsumerId()
.getConnectionId()))) {
tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
log.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
}
super.addSubscription(context, sub);
}
protected void wakeup() {
boolean result = false;
synchronized (messages) {
result = !messages.isEmpty();
}
if (result) {
try {
pageInMessages(false);
} catch (Throwable e) {
log.error("Failed to page in more queue messages ", e);
}
}
if (!messagesWaitingForSpace.isEmpty()) {
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
log.warn("Task Runner failed to wakeup ", e);
}
}
}
}

View File

@ -45,26 +45,9 @@ public class TempQueueRegion extends AbstractTempRegion {
} }
protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; TempQueue result = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) { result.initialize();
return result;
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
// However, we could have failed over - and we do this
// check client side anyways ....
if (!context.isFaultTolerant()
&& (!context.isNetworkConnection() && !tempDest
.getConnectionId().equals(
sub.getConsumerInfo().getConsumerId()
.getConnectionId()))) {
tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
}
super.addSubscription(context, sub);
};
};
} }
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunnerFactory;
/**
* The Topic is a destination that sends a copy of a message to every active
* Subscription registered.
*
* @version $Revision: 1.21 $
*/
public class TempTopic extends Topic implements Task{
private final ActiveMQTempDestination tempDest;
/**
* @param brokerService
* @param destination
* @param store
* @param parentStats
* @param taskFactory
* @throws Exception
*/
public TempTopic(BrokerService brokerService,
ActiveMQDestination destination, TopicMessageStore store,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
throws Exception {
super(brokerService, destination, store, parentStats, taskFactory);
this.tempDest = (ActiveMQTempDestination) destination;
}
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
// However, we could have failed over - and we do this
// check client side anyways ....
if (!context.isFaultTolerant()
&& (!context.isNetworkConnection() && !tempDest
.getConnectionId().equals(
sub.getConsumerInfo().getConsumerId()
.getConnectionId()))) {
tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
log.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
}
super.addSubscription(context, sub);
}
public void initialize() {
}
}

View File

@ -65,7 +65,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.21 $ * @version $Revision: 1.21 $
*/ */
public class Topic extends BaseDestination implements Task{ public class Topic extends BaseDestination implements Task{
private static final Log LOG = LogFactory.getLog(Topic.class); protected final Log log;
private final TopicMessageStore topicStore; private final TopicMessageStore topicStore;
protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
protected final Valve dispatchValve = new Valve(true); protected final Valve dispatchValve = new Valve(true);
@ -90,6 +90,7 @@ public class Topic extends BaseDestination implements Task{
TaskRunnerFactory taskFactory) throws Exception { TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats); super(brokerService, store, destination, parentStats);
this.topicStore=store; this.topicStore=store;
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
//set default subscription recovery policy //set default subscription recovery policy
if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){ if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){
subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy(); subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy();
@ -345,15 +346,15 @@ public class Topic extends BaseDestination implements Task{
if (count > 2 && context.isInTransaction()) { if (count > 2 && context.isInTransaction()) {
count =0; count =0;
int size = context.getTransaction().size(); int size = context.getTransaction().size();
LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message); log.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
} }
} }
// The usage manager could have delayed us by the time // The usage manager could have delayed us by the time
// we unblock the message could have expired.. // we unblock the message could have expired..
if (message.isExpired()) { if (message.isExpired()) {
if (LOG.isDebugEnabled()) { if (log.isDebugEnabled()) {
LOG.debug("Expired message: " + message); log.debug("Expired message: " + message);
} }
return; return;
} }
@ -499,7 +500,7 @@ public class Topic extends BaseDestination implements Task{
} }
} }
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e); log.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
} }
return result.toArray(new Message[result.size()]); return result.toArray(new Message[result.size()]);
} }