git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@957881 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-06-25 10:28:17 +00:00
parent eeacd65a2b
commit 00879cf683
34 changed files with 637 additions and 273 deletions

View File

@ -22,7 +22,7 @@ import java.util.List;
import org.apache.activemq.command.MessageDispatch;
public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
private static Integer MAX_PRIORITY = 10;
private static final Integer MAX_PRIORITY = 10;
private final Object mutex = new Object();
private final LinkedList<MessageDispatch>[] lists;
private boolean closed;
@ -234,7 +234,7 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
}
protected int getPriority(MessageDispatch message) {
int priority = Message.DEFAULT_PRIORITY;
int priority = javax.jms.Message.DEFAULT_PRIORITY;
if (message.getMessage() != null) {
Math.max(message.getMessage().getPriority(), 0);
priority = Math.min(priority, 9);

View File

@ -16,19 +16,6 @@
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -48,6 +35,19 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class DestinationView implements DestinationViewMBean {
private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class);
@ -126,6 +126,10 @@ public class DestinationView implements DestinationViewMBean {
public long getMinEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getMinTime();
}
public boolean isPrioritizedMessages() {
return destination.isPrioritizedMessages();
}
public CompositeData[] browse() throws OpenDataException {
try {

View File

@ -16,16 +16,15 @@
*/
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import javax.jms.InvalidSelectorException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.ObjectName;
import javax.management.MalformedObjectNameException;
public interface DestinationViewMBean {
@ -313,6 +312,12 @@ public interface DestinationViewMBean {
@MBeanInfo("Caching is enabled")
public boolean isUseCache();
/**
* @return true if prioritized messages are enabled for the destination
*/
@MBeanInfo("Prioritized messages is enabled")
public boolean isPrioritizedMessages();
/**
* @param value
* enable/disable caching on the destination

View File

@ -81,6 +81,7 @@ public abstract class BaseDestination implements Destination {
protected int cursorMemoryHighWaterMark = 70;
protected int storeUsageHighWaterMark = 100;
private SlowConsumerStrategy slowConsumerStrategy;
private boolean prioritizedMessages;
/**
* @param broker
@ -580,5 +581,14 @@ public abstract class BaseDestination implements Destination {
public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
this.slowConsumerStrategy = slowConsumerStrategy;
}
public boolean isPrioritizedMessages() {
return this.prioritizedMessages;
}
public void setPrioritizedMessages(boolean prioritizedMessages) {
this.prioritizedMessages = prioritizedMessages;
}
}

View File

@ -18,14 +18,12 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.List;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
@ -215,4 +213,6 @@ public interface Destination extends Service, Task {
* @throws Exception
*/
void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
boolean isPrioritizedMessages();
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -39,7 +38,7 @@ import org.apache.activemq.usage.Usage;
*/
public class DestinationFilter implements Destination {
private Destination next;
private final Destination next;
public DestinationFilter(Destination next) {
this.next = next;
@ -270,4 +269,8 @@ public class DestinationFilter implements Destination {
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
}
public boolean isPrioritizedMessages() {
return next.isPrioritizedMessages();
}
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
@ -33,7 +31,7 @@ public interface MessageReference {
MessageId getMessageId();
Message getMessageHardRef();
Message getMessage() throws IOException;
Message getMessage();
boolean isPersistent();
Destination getRegionDestination();

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
@ -28,7 +26,7 @@ import org.apache.activemq.command.MessageId;
*/
final class NullMessageReference implements QueueMessageReference {
private ActiveMQMessage message = new ActiveMQMessage();
private final ActiveMQMessage message = new ActiveMQMessage();
private volatile int references;
public void drop() {
@ -75,7 +73,7 @@ final class NullMessageReference implements QueueMessageReference {
return 0;
}
public Message getMessage() throws IOException {
public Message getMessage() {
return message;
}

View File

@ -78,7 +78,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this(broker,usageManager,context, info, new VMPendingMessageCursor());
this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
}
/**

View File

@ -236,7 +236,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public void initialize() throws Exception {
if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) {
this.messages = new VMPendingMessageCursor();
this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
} else {
this.messages = new StoreQueueCursor(broker, this);
}
@ -951,38 +951,30 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public Message getMessage(String id) {
MessageId msgId = new MessageId(id);
try {
synchronized (pagedInMessages) {
QueueMessageReference r = this.pagedInMessages.get(msgId);
if (r != null) {
return r.getMessage();
}
synchronized (pagedInMessages) {
QueueMessageReference r = this.pagedInMessages.get(msgId);
if (r != null) {
return r.getMessage();
}
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext()) {
try {
MessageReference r = messages.next();
r.decrementReferenceCount();
messages.rollback(r.getMessageId());
if (msgId.equals(r.getMessageId())) {
Message m = r.getMessage();
if (m != null) {
return m;
}
break;
}
} catch (IOException e) {
LOG.error("got an exception retrieving message " + id);
}
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext()) {
MessageReference r = messages.next();
r.decrementReferenceCount();
messages.rollback(r.getMessageId());
if (msgId.equals(r.getMessageId())) {
Message m = r.getMessage();
if (m != null) {
return m;
}
break;
}
} finally {
messages.release();
}
} finally {
messages.release();
}
} catch (IOException e) {
LOG.error("got an exception retrieving message " + id);
}
return null;
}

View File

@ -56,7 +56,7 @@ public class TempQueue extends Queue{
@Override
public void initialize() throws Exception {
this.messages=new VMPendingMessageCursor();
this.messages=new VMPendingMessageCursor(false);
this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());

View File

@ -72,9 +72,9 @@ public class TopicSubscription extends AbstractSubscription {
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
this.matched = new VMPendingMessageCursor();
this.matched = new VMPendingMessageCursor(false);
} else {
this.matched = new FilePendingMessageCursor(broker,matchedName);
this.matched = new FilePendingMessageCursor(broker,matchedName,false);
}
}

View File

@ -19,11 +19,14 @@ package org.apache.activemq.broker.region.cursors;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
@ -44,6 +47,11 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
protected boolean useCache=true;
private boolean started=false;
protected MessageReference last = null;
protected final boolean prioritizedMessages;
public AbstractPendingMessageCursor(boolean prioritizedMessages) {
this.prioritizedMessages=prioritizedMessages;
}
public synchronized void start() throws Exception {
@ -304,4 +312,19 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
protected synchronized boolean isStarted() {
return started;
}
public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
boolean result = false;
Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
if (destinations != null) {
for (Destination dest:destinations) {
if (dest.isPrioritizedMessages()) {
result = true;
break;
}
}
}
return result;
}
}

View File

@ -17,8 +17,6 @@
package org.apache.activemq.broker.region.cursors;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
@ -34,8 +32,8 @@ import org.apache.commons.logging.LogFactory;
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
protected final Destination regionDestination;
private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
private Iterator<Entry<MessageId, Message>> iterator = null;
private final PendingList batchList;
private Iterator<MessageReference> iterator = null;
private boolean cacheEnabled=false;
protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false;
@ -43,10 +41,16 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
private MessageId lastCachedId;
protected AbstractStoreCursor(Destination destination) {
super((destination != null ? destination.isPrioritizedMessages():false));
this.regionDestination=destination;
if (this.prioritizedMessages) {
this.batchList= new PrioritizedPendingList();
}else {
this.batchList = new OrderedPendingList();
}
}
@Override
public final synchronized void start() throws Exception{
if (!isStarted()) {
super.start();
@ -60,7 +64,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
@Override
public final synchronized void stop() throws Exception {
resetBatch();
super.stop();
@ -82,7 +86,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
message.incrementReferenceCount();
batchList.put(message.getMessageId(), message);
batchList.addMessageLast(message);
clearIterator(true);
recovered = true;
} else {
@ -99,7 +103,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return recovered;
}
@Override
public final void reset() {
if (batchList.isEmpty()) {
try {
@ -113,7 +117,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
size();
}
@Override
public synchronized void release() {
clearIterator(false);
}
@ -129,7 +133,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
private synchronized void ensureIterator() {
if(this.iterator==null) {
this.iterator=this.batchList.entrySet().iterator();
this.iterator=this.batchList.iterator();
}
}
@ -137,7 +141,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final void finished() {
}
@Override
public final synchronized boolean hasNext() {
if (batchList.isEmpty()) {
try {
@ -151,11 +155,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return this.iterator.hasNext();
}
@Override
public final synchronized MessageReference next() {
MessageReference result = null;
if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
result = this.iterator.next().getValue();
result = this.iterator.next();
}
last = result;
if (result != null) {
@ -164,7 +168,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return result;
}
@Override
public final synchronized void addMessageLast(MessageReference node) throws Exception {
if (cacheEnabled && hasSpace()) {
recoverMessage(node.getMessage(),true);
@ -189,13 +193,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected void setBatch(MessageId messageId) throws Exception {
}
@Override
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
cacheEnabled=false;
size++;
}
@Override
public final synchronized void remove() {
size--;
if (iterator!=null) {
@ -212,21 +216,22 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
@Override
public final synchronized void remove(MessageReference node) {
size--;
cacheEnabled=false;
batchList.remove(node.getMessageId());
batchList.remove(node);
}
@Override
public final synchronized void clear() {
gc();
}
@Override
public final synchronized void gc() {
for (Message msg : batchList.values()) {
for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) {
MessageReference msg = i.next();
rollback(msg.getMessageId());
msg.decrementReferenceCount();
}
@ -241,7 +246,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
@Override
protected final synchronized void fillBatch() {
if (batchResetNeeded) {
resetBatch();
@ -261,18 +266,18 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
@Override
public final synchronized boolean isEmpty() {
// negative means more messages added to store through queue.send since last reset
return size == 0;
}
@Override
public final synchronized boolean hasMessagesBufferedToDeliver() {
return !batchList.isEmpty();
}
@Override
public final synchronized int size() {
if (size < 0) {
this.size = getStoreSize();

View File

@ -63,9 +63,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/**
* @param broker
* @param name
* @param prioritizedMessages
* @param store
*/
public FilePendingMessageCursor(Broker broker, String name) {
public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
super(prioritizedMessages);
this.useCache = false;
this.broker = broker;
// the store can be null if the BrokerService has persistence
@ -190,6 +192,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
tryAddMessageLast(node, 0);
}
@Override
public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
if (!node.isExpired()) {
try {

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
public class OrderedPendingList implements PendingList {
PendingNode root = null;
PendingNode tail = null;
final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
public PendingNode addMessageFirst(MessageReference message) {
PendingNode node = new PendingNode(this, message);
if (root == null) {
root = node;
tail = node;
} else {
root.linkBefore(node);
}
this.map.put(message.getMessageId(), node);
return node;
}
public PendingNode addMessageLast(MessageReference message) {
PendingNode node = new PendingNode(this, message);
if (root == null) {
root = node;
} else {
tail.linkAfter(node);
}
tail = node;
this.map.put(message.getMessageId(), node);
return node;
}
public void clear() {
this.root = null;
this.tail = null;
this.map.clear();
}
public boolean isEmpty() {
return this.map.isEmpty();
}
public Iterator<MessageReference> iterator() {
return new Iterator<MessageReference>() {
private PendingNode current = null;
private PendingNode next = root;
public boolean hasNext() {
return next != null;
}
public MessageReference next() {
MessageReference result = null;
this.current = this.next;
result = this.current.getMessage();
this.next = (PendingNode) this.next.getNext();
return result;
}
public void remove() {
if (this.current != null && this.current.getMessage() != null) {
map.remove(this.current.getMessage().getMessageId());
}
removeNode(this.current);
}
};
}
public void remove(MessageReference message) {
if (message != null) {
PendingNode node = this.map.remove(message.getMessageId());
removeNode(node);
}
}
public int size() {
return this.map.size();
}
void removeNode(PendingNode node) {
if (node != null) {
map.remove(node.getMessage().getMessageId());
if (root == node) {
root = (PendingNode) node.getNext();
}
if (tail == node) {
tail = (PendingNode) node.getPrevious();
}
node.unlink();
}
}
List<PendingNode> getAsList() {
List<PendingNode> result = new ArrayList<PendingNode>(size());
PendingNode node = root;
while (node != null) {
result.add(node);
node = (PendingNode) node.getNext();
}
return result;
}
@Override
public String toString() {
return "OrderedPendingList(" + System.identityHashCode(this) + ")";
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.util.Iterator;
import org.apache.activemq.broker.region.MessageReference;
public interface PendingList {
public boolean isEmpty();
public void clear();
public PendingNode addMessageFirst(MessageReference message);
public PendingNode addMessageLast(MessageReference message);
public void remove(MessageReference message);
public int size();
public Iterator<MessageReference> iterator();
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.util.LinkedNode;
public class PendingNode extends LinkedNode {
private final MessageReference message;
private final OrderedPendingList list;
public PendingNode(OrderedPendingList list,MessageReference message) {
this.list = list;
this.message = message;
}
MessageReference getMessage() {
return this.message;
}
OrderedPendingList getList() {
return this.list;
}
@Override
public String toString() {
PendingNode n = (PendingNode) getNext();
String str = "PendingNode(";
str += System.identityHashCode(this) + "),root="+isHeadNode()+",next="+(n != null ?System.identityHashCode(n):"NULL");
return str;
}
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
public class PrioritizedPendingList implements PendingList {
static final Integer MAX_PRIORITY = 10;
private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
public PrioritizedPendingList() {
for (int i = 0; i < MAX_PRIORITY; i++) {
this.lists[i] = new OrderedPendingList();
}
}
public PendingNode addMessageFirst(MessageReference message) {
PendingNode node = getList(message).addMessageFirst(message);
this.map.put(message.getMessageId(), node);
return node;
}
public PendingNode addMessageLast(MessageReference message) {
PendingNode node = getList(message).addMessageLast(message);
this.map.put(message.getMessageId(), node);
return node;
}
public void clear() {
for (int i = 0; i < MAX_PRIORITY; i++) {
this.lists[i].clear();
}
this.map.clear();
}
public boolean isEmpty() {
return this.map.isEmpty();
}
public Iterator<MessageReference> iterator() {
return new PrioritizedPendingListIterator();
}
public void remove(MessageReference message) {
if (message != null) {
PendingNode node = this.map.remove(message.getMessageId());
if (node != null) {
node.getList().removeNode(node);
}
}
}
public int size() {
return this.map.size();
}
@Override
public String toString() {
return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
}
protected int getPriority(MessageReference message) {
int priority = javax.jms.Message.DEFAULT_PRIORITY;
if (message.getMessageId() != null) {
Math.max(message.getMessage().getPriority(), 0);
priority = Math.min(priority, 9);
}
return priority;
}
protected OrderedPendingList getList(MessageReference msg) {
return lists[getPriority(msg)];
}
private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
private int index = 0;
private int currentIndex = 0;
List<PendingNode> list = new ArrayList<PendingNode>(size());
PrioritizedPendingListIterator() {
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
OrderedPendingList orderedPendingList = lists[i];
if (!orderedPendingList.isEmpty()) {
list.addAll(orderedPendingList.getAsList());
}
}
}
public boolean hasNext() {
return list.size() > index;
}
public MessageReference next() {
PendingNode node = list.get(this.index);
this.currentIndex = this.index;
this.index++;
return node.getMessage();
}
public void remove() {
PendingNode node = list.get(this.currentIndex);
if (node != null) {
map.remove(node.getMessage().getMessageId());
node.getList().removeNode(node);
}
}
}
}

View File

@ -21,7 +21,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@ -58,13 +57,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* @param subscription subscription for this cursor
*/
public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
this.subscription=subscription;
this.clientId = clientId;
this.subscriberName = subscriberName;
if (broker.getBrokerService().isPersistent()) {
this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName);
this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
}else {
this.nonPersistent = new VMPendingMessageCursor();
this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
}
this.nonPersistent.setMaxBatchSize(maxBatchSize);
@ -72,6 +72,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
this.storePrefetches.add(this.nonPersistent);
}
@Override
public synchronized void start() throws Exception {
if (!isStarted()) {
super.start();
@ -82,6 +83,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
}
@Override
public synchronized void stop() throws Exception {
if (isStarted()) {
super.stop();
@ -98,6 +100,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* @param destination
* @throws Exception
*/
@Override
public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
@ -122,6 +125,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* @param destination
* @throws Exception
*/
@Override
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
PendingMessageCursor tsp = topics.remove(destination);
if (tsp != null) {
@ -133,6 +137,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
/**
* @return true if there are no pending messages
*/
@Override
public synchronized boolean isEmpty() {
for (PendingMessageCursor tsp : storePrefetches) {
if( !tsp.isEmpty() )
@ -141,6 +146,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
return true;
}
@Override
public synchronized boolean isEmpty(Destination destination) {
boolean result = true;
TopicStorePrefetch tsp = topics.get(destination);
@ -157,10 +163,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
* @return true if recovery required
*/
@Override
public boolean isRecoveryRequired() {
return false;
}
@Override
public synchronized void addMessageLast(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
@ -179,16 +187,19 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
}
@Override
public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
nonPersistent.addMessageLast(node);
}
@Override
public synchronized void clear() {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.clear();
}
}
@Override
public synchronized boolean hasNext() {
boolean result = true;
if (result) {
@ -203,35 +214,41 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
return result;
}
@Override
public synchronized MessageReference next() {
MessageReference result = currentCursor != null ? currentCursor.next() : null;
return result;
}
@Override
public synchronized void remove() {
if (currentCursor != null) {
currentCursor.remove();
}
}
@Override
public synchronized void remove(MessageReference node) {
if (currentCursor != null) {
currentCursor.remove(node);
}
}
@Override
public synchronized void reset() {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.reset();
}
}
@Override
public synchronized void release() {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.release();
}
}
@Override
public synchronized int size() {
int pendingCount=0;
for (PendingMessageCursor tsp : storePrefetches) {
@ -240,6 +257,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
return pendingCount;
}
@Override
public void setMaxBatchSize(int maxBatchSize) {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.setMaxBatchSize(maxBatchSize);
@ -247,12 +265,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
super.setMaxBatchSize(maxBatchSize);
}
@Override
public synchronized void gc() {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.gc();
}
}
@Override
public void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager);
for (PendingMessageCursor tsp : storePrefetches) {
@ -260,6 +280,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
}
@Override
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
for (PendingMessageCursor cursor : storePrefetches) {
@ -267,6 +288,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
}
@Override
public void setMaxProducersToAudit(int maxProducersToAudit) {
super.setMaxProducersToAudit(maxProducersToAudit);
for (PendingMessageCursor cursor : storePrefetches) {
@ -274,6 +296,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
}
@Override
public void setMaxAuditDepth(int maxAuditDepth) {
super.setMaxAuditDepth(maxAuditDepth);
for (PendingMessageCursor cursor : storePrefetches) {
@ -281,6 +304,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
}
@Override
public void setEnableAudit(boolean enableAudit) {
super.setEnableAudit(enableAudit);
for (PendingMessageCursor cursor : storePrefetches) {
@ -288,6 +312,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
}
@Override
public void setUseCache(boolean useCache) {
super.setUseCache(useCache);
for (PendingMessageCursor cursor : storePrefetches) {
@ -313,6 +338,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
return currentCursor;
}
@Override
public String toString() {
return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
}

View File

@ -32,21 +32,21 @@ import org.apache.commons.logging.LogFactory;
public class StoreQueueCursor extends AbstractPendingMessageCursor {
private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
private Broker broker;
private final Broker broker;
private int pendingCount;
private Queue queue;
private final Queue queue;
private PendingMessageCursor nonPersistent;
private QueueStorePrefetch persistent;
private final QueueStorePrefetch persistent;
private boolean started;
private PendingMessageCursor currentCursor;
/**
* Construct
*
* @param broker
* @param queue
* @param tmpStore
*/
public StoreQueueCursor(Broker broker,Queue queue) {
super((queue != null ? queue.isPrioritizedMessages():false));
this.broker=broker;
this.queue = queue;
this.persistent = new QueueStorePrefetch(queue);
@ -58,9 +58,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
super.start();
if (nonPersistent == null) {
if (broker.getBrokerService().isPersistent()) {
nonPersistent = new FilePendingMessageCursor(broker,queue.getName());
nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
}else {
nonPersistent = new VMPendingMessageCursor();
nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
}
nonPersistent.setMaxBatchSize(getMaxBatchSize());
nonPersistent.setSystemUsage(systemUsage);
@ -101,7 +101,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
}
}
public synchronized void addMessageFirst(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
@ -240,6 +240,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
}
@Override
public void setUseCache(boolean useCache) {
super.setUseCache(useCache);
if (persistent != null) {
@ -250,6 +251,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
}
@Override
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
if (persistent != null) {

View File

@ -32,13 +32,20 @@ import org.apache.activemq.broker.region.QueueMessageReference;
* @version $Revision$
*/
public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
private final LinkedList<MessageReference> list = new LinkedList<MessageReference>();
private final PendingList list;
private Iterator<MessageReference> iter;
public VMPendingMessageCursor() {
public VMPendingMessageCursor(boolean prioritizedMessages) {
super(prioritizedMessages);
this.useCache = false;
if (this.prioritizedMessages) {
this.list= new PrioritizedPendingList();
}else {
this.list = new OrderedPendingList();
}
}
@Override
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>();
@ -56,7 +63,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* @return true if there are no pending messages
*/
@Override
public synchronized boolean isEmpty() {
if (list.isEmpty()) {
return true;
@ -79,9 +86,9 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* reset the cursor
*/
@Override
public synchronized void reset() {
iter = list.listIterator();
iter = list.iterator();
last = null;
}
@ -90,10 +97,10 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
*
* @param node
*/
@Override
public synchronized void addMessageLast(MessageReference node) {
node.incrementReferenceCount();
list.addLast(node);
list.addMessageLast(node);
}
/**
@ -102,16 +109,16 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
* @param position
* @param node
*/
@Override
public synchronized void addMessageFirst(MessageReference node) {
node.incrementReferenceCount();
list.addFirst(node);
list.addMessageFirst(node);
}
/**
* @return true if there pending messages to dispatch
*/
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@ -119,7 +126,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* @return the next pending message
*/
@Override
public synchronized MessageReference next() {
last = iter.next();
if (last != null) {
@ -131,7 +138,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* remove the message at the cursor position
*/
@Override
public synchronized void remove() {
if (last != null) {
last.decrementReferenceCount();
@ -142,7 +149,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* @return the number of pending messages
*/
@Override
public synchronized int size() {
return list.size();
}
@ -150,7 +157,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* clear all pending messages
*/
@Override
public synchronized void clear() {
for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
MessageReference ref = i.next();
@ -159,16 +166,10 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
list.clear();
}
@Override
public synchronized void remove(MessageReference node) {
for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
MessageReference ref = i.next();
if (node.getMessageId().equals(ref.getMessageId())) {
ref.decrementReferenceCount();
i.remove();
break;
}
}
list.remove(node);
node.decrementReferenceCount();
}
/**
@ -177,10 +178,11 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
* @param maxItems
* @return a list of paged in messages
*/
@Override
public LinkedList<MessageReference> pageInList(int maxItems) {
LinkedList<MessageReference> result = new LinkedList<MessageReference>();
for (MessageReference ref: list) {
for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
MessageReference ref = i.next();
ref.incrementReferenceCount();
result.add(ref);
if (result.size() >= maxItems) {
@ -190,12 +192,12 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
return result;
}
@Override
public boolean isTransient() {
return true;
}
@Override
public void destroy() throws Exception {
super.destroy();
clear();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -42,6 +43,6 @@ public class FilePendingDurableSubscriberMessageStoragePolicy implements Pending
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
return new FilePendingMessageCursor(broker,name);
return new FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub));
}
}

View File

@ -39,7 +39,7 @@ public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessage
* org.apache.activemq.kaha.Store)
*/
public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName());
return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName(),queue.isPrioritizedMessages());
}
}

View File

@ -17,6 +17,8 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -31,14 +33,16 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
/**
* @param broker
* @param broker
* @param name
* @param maxBatchSize
* @return a Cursor
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
* org.apache.activemq.kaha.Store, int)
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
return new FilePendingMessageCursor(broker,"PendingCursor:" + name);
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker, String name, int maxBatchSize,
Subscription subs) {
return new FilePendingMessageCursor(broker, "PendingCursor:" + name, AbstractPendingMessageCursor
.isPrioritizedMessageSubscriber(broker, subs));
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
/**
@ -35,5 +36,5 @@ public interface PendingSubscriberMessageStoragePolicy {
* @param maxBatchSize
* @return the Pending Message cursor
*/
PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize);
PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs);
}

View File

@ -87,6 +87,7 @@ public class PolicyEntry extends DestinationMapEntry {
private int cursorMemoryHighWaterMark = 70;
private int storeUsageHighWaterMark = 100;
private SlowConsumerStrategy slowConsumerStrategy;
private boolean prioritizedMessages;
public void configure(Broker broker,Queue queue) {
@ -155,6 +156,7 @@ public class PolicyEntry extends DestinationMapEntry {
scs.setScheduler(broker.getScheduler());
}
destination.setSlowConsumerStrategy(scs);
destination.setPrioritizedMessages(isPrioritizedMessages());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@ -184,7 +186,7 @@ public class PolicyEntry extends DestinationMapEntry {
if (pendingSubscriberPolicy != null) {
String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription));
}
if (enableAudit) {
subscription.setEnableAudit(enableAudit);
@ -739,5 +741,14 @@ public class PolicyEntry extends DestinationMapEntry {
public SlowConsumerStrategy getSlowConsumerStrategy() {
return this.slowConsumerStrategy;
}
public boolean isPrioritizedMessages() {
return this.prioritizedMessages;
}
public void setPrioritizedMessages(boolean prioritizedMessages) {
this.prioritizedMessages = prioritizedMessages;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@ -40,6 +41,6 @@ public class VMPendingDurableSubscriberMessageStoragePolicy implements PendingDu
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) {
return new VMPendingMessageCursor();
return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub));
}
}

View File

@ -37,6 +37,6 @@ public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageSt
* @return the cursor
*/
public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
return new VMPendingMessageCursor();
return new VMPendingMessageCursor(queue.isPrioritizedMessages());
}
}

View File

@ -17,6 +17,8 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@ -38,7 +40,7 @@ public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscribe
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
* org.apache.activemq.kaha.Store, int)
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
return new VMPendingMessageCursor();
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs) {
return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, subs));
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.broker.util;
import java.io.IOException;
import java.util.Set;
import javax.annotation.PostConstruct;
import org.apache.activemq.broker.BrokerPluginSupport;
@ -65,7 +64,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
private boolean logInternalEvents = false;
/**
*
*
* @throws Exception
* @org.apache.xbean.InitMethod
*/
@ -77,7 +76,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
public boolean isLogAll() {
return logAll;
}
/**
* Log all Events that go through the Plugin
*/
@ -152,15 +151,12 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange,
MessageAck ack) throws Exception {
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
if (isLogAll() || isLogConsumerEvents()) {
LOG.info("Acknowledging message for client ID : "
+ consumerExchange.getConnectionContext().getClientId()
LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId()
+ (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
LOG.trace("Message count: " + ack.getMessageCount()
+ ", First Message Id: " + ack.getFirstMessageId()
LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId()
+ ", Last Message Id: " + ack.getLastMessageId());
}
}
@ -168,18 +164,15 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public Response messagePull(ConnectionContext context, MessagePull pull)
throws Exception {
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
if (isLogAll() || isLogConsumerEvents()) {
LOG.info("Message Pull from : " + context.getClientId() + " on "
+ pull.getDestination().getPhysicalName());
LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName());
}
return super.messagePull(context, pull);
}
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info)
throws Exception {
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
if (isLogAll() || isLogConnectionEvents()) {
LOG.info("Adding Connection : " + context);
}
@ -187,8 +180,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception {
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (isLogAll() || isLogConsumerEvents()) {
LOG.info("Adding Consumer : " + info);
}
@ -196,8 +188,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
if (isLogAll() || isLogProducerEvents()) {
LOG.info("Adding Producer :" + info);
}
@ -205,8 +196,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid,
boolean onePhase) throws Exception {
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
if (isLogAll() || isLogTransactionEvents()) {
LOG.info("Commiting transaction : " + xid.getTransactionKey());
}
@ -214,8 +204,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void removeSubscription(ConnectionContext context,
RemoveSubscriptionInfo info) throws Exception {
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
if (isLogAll() || isLogConsumerEvents()) {
LOG.info("Removing subscription : " + info);
}
@ -223,8 +212,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public TransactionId[] getPreparedTransactions(ConnectionContext context)
throws Exception {
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
TransactionId[] result = super.getPreparedTransactions(context);
if ((isLogAll() || isLogTransactionEvents()) && result != null) {
@ -241,8 +229,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid)
throws Exception {
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
if (isLogAll() || isLogTransactionEvents()) {
LOG.info("Preparing transaction : " + xid.getTransactionKey());
}
@ -250,8 +237,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void removeConnection(ConnectionContext context,
ConnectionInfo info, Throwable error) throws Exception {
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
if (isLogAll() || isLogConnectionEvents()) {
LOG.info("Removing Connection : " + info);
}
@ -259,8 +245,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception {
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (isLogAll() || isLogConsumerEvents()) {
LOG.info("Removing Consumer : " + info);
}
@ -268,8 +253,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
if (isLogAll() || isLogProducerEvents()) {
LOG.info("Removing Producer : " + info);
}
@ -277,8 +261,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid)
throws Exception {
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
if (isLogAll() || isLogTransactionEvents()) {
LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
}
@ -286,8 +269,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void send(ProducerBrokerExchange producerExchange,
Message messageSend) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
if (isLogAll() || isLogProducerEvents()) {
LOG.info("Sending message : " + messageSend);
}
@ -295,8 +277,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid)
throws Exception {
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
if (isLogAll() || isLogTransactionEvents()) {
LOG.info("Beginning transaction : " + xid.getTransactionKey());
}
@ -304,11 +285,9 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void forgetTransaction(ConnectionContext context,
TransactionId transactionId) throws Exception {
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
if (isLogAll() || isLogTransactionEvents()) {
LOG.info("Forgetting transaction : "
+ transactionId.getTransactionKey());
LOG.info("Forgetting transaction : " + transactionId.getTransactionKey());
}
super.forgetTransaction(context, transactionId);
}
@ -333,23 +312,20 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public org.apache.activemq.broker.region.Destination addDestination(
ConnectionContext context, ActiveMQDestination destination,boolean create)
throws Exception {
public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
ActiveMQDestination destination, boolean create) throws Exception {
if (isLogAll() || isLogInternalEvents()) {
LOG.info("Adding destination : "
+ destination.getDestinationTypeAsString() + ":"
LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":"
+ destination.getPhysicalName());
}
return super.addDestination(context, destination,create);
return super.addDestination(context, destination, create);
}
@Override
public void removeDestination(ConnectionContext context,
ActiveMQDestination destination, long timeout) throws Exception {
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
throws Exception {
if (isLogAll() || isLogInternalEvents()) {
LOG.info("Removing destination : "
+ destination.getDestinationTypeAsString() + ":"
LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":"
+ destination.getPhysicalName());
}
super.removeDestination(context, destination, timeout);
@ -390,8 +366,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void addSession(ConnectionContext context, SessionInfo info)
throws Exception {
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
if (isLogAll() || isLogConnectionEvents()) {
LOG.info("Adding Session : " + info);
}
@ -399,8 +374,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void removeSession(ConnectionContext context, SessionInfo info)
throws Exception {
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
if (isLogAll() || isLogConnectionEvents()) {
LOG.info("Removing Session : " + info);
}
@ -458,12 +432,9 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification)
throws Exception {
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
LOG.info("ProcessDispatchNotification :"
+ messageDispatchNotification);
LOG.info("ProcessDispatchNotification :" + messageDispatchNotification);
}
super.processDispatchNotification(messageDispatchNotification);
}
@ -487,8 +458,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void addDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception {
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
if (isLogAll() || isLogInternalEvents()) {
LOG.info("Adding destination info : " + info);
}
@ -496,8 +466,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void removeDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception {
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
if (isLogAll() || isLogInternalEvents()) {
LOG.info("Removing destination info : " + info);
}
@ -505,36 +474,31 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void messageExpired(ConnectionContext context,
MessageReference message) {
public void messageExpired(ConnectionContext context, MessageReference message) {
if (isLogAll() || isLogInternalEvents()) {
String msg = "Unable to display message.";
try {
msg = message.getMessage().toString();
} catch (IOException ioe) {
}
msg = message.getMessage().toString();
LOG.info("Message has expired : " + msg);
}
super.messageExpired(context, message);
}
@Override
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference messageReference) {
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) {
if (isLogAll() || isLogInternalEvents()) {
String msg = "Unable to display message.";
try {
msg = messageReference.getMessage().toString();
} catch (IOException ioe) {
}
msg = messageReference.getMessage().toString();
LOG.info("Sending to DLQ : " + msg);
}
super.sendToDeadLetterQueue(context, messageReference);
}
@Override
public void fastProducer(ConnectionContext context,
ProducerInfo producerInfo) {
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
LOG.info("Fast Producer : " + producerInfo);
}
@ -542,8 +506,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void isFull(ConnectionContext context, Destination destination,
Usage usage) {
public void isFull(ConnectionContext context, Destination destination, Usage usage) {
if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
LOG.info("Destination is full : " + destination.getName());
}
@ -551,50 +514,43 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void messageConsumed(ConnectionContext context,
MessageReference messageReference) {
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
String msg = "Unable to display message.";
try {
msg = messageReference.getMessage().toString();
} catch (IOException ioe) {
}
msg = messageReference.getMessage().toString();
LOG.info("Message consumed : " + msg);
}
super.messageConsumed(context, messageReference);
}
@Override
public void messageDelivered(ConnectionContext context,
MessageReference messageReference) {
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
String msg = "Unable to display message.";
try {
msg = messageReference.getMessage().toString();
} catch (IOException ioe) {
}
msg = messageReference.getMessage().toString();
LOG.info("Message delivered : " + msg);
}
super.messageDelivered(context, messageReference);
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub,
MessageReference messageReference) {
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
if (isLogAll() || isLogInternalEvents()) {
String msg = "Unable to display message.";
try {
msg = messageReference.getMessage().toString();
} catch (IOException ioe) {
}
msg = messageReference.getMessage().toString();
LOG.info("Message discarded : " + msg);
}
super.messageDiscarded(context, sub, messageReference);
}
@Override
public void slowConsumer(ConnectionContext context,
Destination destination, Subscription subs) {
public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
LOG.info("Detected slow consumer on " + destination.getName());
StringBuffer buf = new StringBuffer("Connection(");

View File

@ -564,7 +564,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
return this;
}
public Message getMessage() throws IOException {
public Message getMessage() {
return this;
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.memory.list;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@ -38,10 +37,10 @@ import org.apache.commons.logging.LogFactory;
*/
public class SimpleMessageList implements MessageList {
private static final Log LOG = LogFactory.getLog(SimpleMessageList.class);
private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
private final LinkedList<MessageReference> list = new LinkedList<MessageReference>();
private int maximumSize = 100 * 64 * 1024;
private int size;
private Object lock = new Object();
private final Object lock = new Object();
public SimpleMessageList() {
}
@ -73,13 +72,9 @@ public class SimpleMessageList implements MessageList {
for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
MessageReference ref = i.next();
Message msg;
try {
msg = ref.getMessage();
if (filter.matches(msg.getDestination())) {
result.add(msg);
}
} catch (IOException e) {
LOG.error("Failed to get Message from MessageReference: " + ref, e);
msg = ref.getMessage();
if (filter.matches(msg.getDestination())) {
result.add(msg);
}
}

View File

@ -16,9 +16,7 @@
*/
package org.apache.activemq.plugin;
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
@ -48,25 +46,15 @@ public class DiscardingDLQBroker extends BrokerFilter {
@Override
public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef) {
if (log.isTraceEnabled()) {
try {
log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
} catch (IOException x) {
log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + msgRef != null ? msgRef : null, x);
}
log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
}
boolean dropped = true;
Message msg = null;
ActiveMQDestination dest = null;
String destName = null;
try {
msg = msgRef.getMessage();
dest = msg.getDestination();
destName = dest.getPhysicalName();
}catch (IOException x) {
if (log.isDebugEnabled()) {
log.debug("Unable to retrieve message or destination for message going to Dead Letter Queue. message skipped.", x);
}
}
msg = msgRef.getMessage();
dest = msg.getDestination();
destName = dest.getPhysicalName();
if (dest == null || destName == null ) {
//do nothing, no need to forward it
@ -105,12 +93,8 @@ public class DiscardingDLQBroker extends BrokerFilter {
private void skipMessage(String prefix, MessageReference msgRef) {
if (log.isDebugEnabled()) {
try {
String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null);
log.debug(lmsg);
}catch (IOException x) {
log.debug("Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef:null),x);
}
String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null);
log.debug(lmsg);
}
}