Use in mmeory list in FilePendingMessageCursor - until memory limit reached - then use disk.
USe FilePendingMessageCursor in TopicSubscription instead of LinkedList

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@479094 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-11-25 07:10:17 +00:00
parent 118c806907
commit 9359e9f6ca
4 changed files with 221 additions and 56 deletions

View File

@ -26,6 +26,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ActiveMQDestination;
@ -50,7 +51,7 @@ public class TopicSubscription extends AbstractSubscription{
private static final Log log=LogFactory.getLog(TopicSubscription.class);
final protected LinkedList matched=new LinkedList();
final protected FilePendingMessageCursor matched;
final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ");
final protected UsageManager usageManager;
protected AtomicLong dispatched=new AtomicLong();
@ -69,6 +70,7 @@ public class TopicSubscription extends AbstractSubscription{
throws InvalidSelectorException{
super(broker,context,info);
this.usageManager=usageManager;
this.matched = new FilePendingMessageCursor(info.getConsumerId().toString(), broker.getTempDataStore());
}
public void add(MessageReference node) throws InterruptedException,IOException{
@ -84,7 +86,7 @@ public class TopicSubscription extends AbstractSubscription{
}else{
if(maximumPendingMessages!=0){
synchronized(matchedListMutex){
matched.addLast(node);
matched.addMessageLast(node);
// NOTE - be careful about the slaveBroker!
if (maximumPendingMessages > 0) {
@ -94,15 +96,22 @@ public class TopicSubscription extends AbstractSubscription{
max = maximumPendingMessages;
}
if (!matched.isEmpty() && matched.size() > max) {
removeExpiredMessages(matched);
removeExpiredMessages();
}
// lets discard old messages as we are a slow consumer
while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(matched);
int pageInSize = matched.size() - maximumPendingMessages;
//only page in a 1000 at a time - else we could blow da memory
pageInSize = Math.max(1000,pageInSize);
LinkedList list = matched.pageInList(pageInSize);
MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(list);
int messagesToEvict = oldMessages.length;
for(int i = 0; i < messagesToEvict; i++) {
oldMessages[i].decrementReferenceCount();
MessageReference oldMessage = oldMessages[i];
oldMessage.decrementReferenceCount();
matched.remove(oldMessage);
discarded++;
if (log.isDebugEnabled()) {
log.debug("Discarding message " + oldMessages[i]);
@ -126,29 +135,33 @@ public class TopicSubscription extends AbstractSubscription{
* Discard any expired messages from the matched list. Called from a synchronized block.
* @throws IOException
*/
protected void removeExpiredMessages(LinkedList messages) throws IOException {
for(Iterator i=matched.iterator();i.hasNext();){
MessageReference node=(MessageReference) i.next();
protected void removeExpiredMessages() throws IOException {
matched.reset();
while(matched.hasNext()) {
MessageReference node=matched.next();
if (node.isExpired()) {
i.remove();
matched.remove();
dispatched.incrementAndGet();
node.decrementReferenceCount();
break;
}
}
matched.release();
}
public void processMessageDispatchNotification(MessageDispatchNotification mdn){
synchronized(matchedListMutex){
for(Iterator i=matched.iterator();i.hasNext();){
MessageReference node=(MessageReference) i.next();
matched.reset();
while(matched.hasNext()) {
MessageReference node=matched.next();
if(node.getMessageId().equals(mdn.getMessageId())){
i.remove();
matched.remove();
dispatched.incrementAndGet();
node.decrementReferenceCount();
break;
}
}
matched.release();
}
}
@ -322,9 +335,10 @@ public class TopicSubscription extends AbstractSubscription{
private void dispatchMatched() throws IOException{
synchronized(matchedListMutex){
for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
MessageReference message=(MessageReference) iter.next();
iter.remove();
matched.reset();
while(matched.hasNext()) {
MessageReference message=(MessageReference) matched.next();
matched.remove();
// Message may have been sitting in the matched list a while
// waiting for the consumer to ak the message.
@ -335,6 +349,7 @@ public class TopicSubscription extends AbstractSubscription{
dispatch(message);
}
matched.release();
}
}
@ -380,11 +395,7 @@ public class TopicSubscription extends AbstractSubscription{
public void destroy() {
synchronized(matchedListMutex){
for (Iterator iter = matched.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
node.decrementReferenceCount();
}
matched.clear();
matched.destroy();
}
}

View File

@ -106,4 +106,8 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{
public boolean hasSpace() {
return usageManager != null ? !usageManager.isFull() : true;
}
public boolean isFull() {
return usageManager != null ? usageManager.isFull() : false;
}
}

View File

@ -11,47 +11,59 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.kahadaptor.CommandMarshaller;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
*
*
* @version $Revision$
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
private ListContainer list;
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{
static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
private Store store;
private String name;
private LinkedList memoryList=new LinkedList();
private ListContainer diskList;
private Iterator iter=null;
private Destination regionDestination;
private Lock iterLock=new ReentrantLock();
private Object mutex=new Object();
/**
* @param name
* @param store
* @throws IOException
*/
public FilePendingMessageCursor(String name,Store store){
try{
list=store.getListContainer(name);
list.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
list.setMaximumCacheSize(0);
}catch(IOException e){
throw new RuntimeException(e);
}
this.name=name;
this.store=store;
}
/**
* @return true if there are no pending messages
*/
public boolean isEmpty(){
return list.isEmpty();
synchronized(mutex){
return memoryList.isEmpty()&&isDiskListEmpty();
}
}
/**
@ -59,7 +71,46 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
*
*/
public void reset(){
iter=list.listIterator();
iterLock.lock();
synchronized(mutex){
iter=isSpaceInMemoryList()?memoryList.iterator():diskList.listIterator();
}
}
public void release(){
iterLock.unlock();
}
public void destroy(){
for(Iterator i=memoryList.iterator();i.hasNext();){
Message node=(Message)i.next();
node.decrementReferenceCount();
}
memoryList.clear();
if(!isDiskListEmpty()){
getDiskList().clear();
}
}
public LinkedList pageInList(int maxItems){
LinkedList result=new LinkedList();
synchronized(mutex){
int count=0;
for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){
result.add(i.next());
count++;
}
if(count<maxItems&&!isDiskListEmpty()){
for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){
Message message=(Message)i.next();
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
result.add(message);
count++;
}
}
}
return result;
}
/**
@ -68,46 +119,66 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
* @param node
*/
public void addMessageLast(MessageReference node){
try{
regionDestination=node.getMessage().getRegionDestination();
node.decrementReferenceCount();
}catch(IOException e){
throw new RuntimeException(e);
synchronized(mutex){
try{
regionDestination=node.getMessage().getRegionDestination();
if(isSpaceInMemoryList()){
memoryList.add(node);
}else{
flushToDisk();
node.decrementReferenceCount();
getDiskList().addLast(node);
}
}catch(IOException e){
throw new RuntimeException(e);
}
}
list.addLast(node);
}
/**
* add message to await dispatch
*
* @param position
* @param node
*/
public void addMessageFirst(MessageReference node){
try{
regionDestination=node.getMessage().getRegionDestination();
node.decrementReferenceCount();
}catch(IOException e){
throw new RuntimeException(e);
synchronized(mutex){
try{
regionDestination=node.getMessage().getRegionDestination();
if(isSpaceInMemoryList()){
memoryList.addFirst(node);
}else{
flushToDisk();
node.decrementReferenceCount();
getDiskList().addFirst(node);
}
}catch(IOException e){
throw new RuntimeException(e);
}
}
list.addFirst(node);
}
/**
* @return true if there pending messages to dispatch
*/
public boolean hasNext(){
return iter.hasNext();
synchronized(mutex){
return iter.hasNext();
}
}
/**
* @return the next pending message
*/
public MessageReference next(){
Message message=(Message) iter.next();
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
return message;
synchronized(mutex){
Message message=(Message)iter.next();
if(!isDiskListEmpty()){
// got from disk
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
}
return message;
}
}
/**
@ -115,17 +186,31 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
*
*/
public void remove(){
iter.remove();
synchronized(mutex){
iter.remove();
}
}
/**
* @param node
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/
public void remove(MessageReference node){
list.remove(node);
synchronized(mutex){
memoryList.remove(node);
if(!isDiskListEmpty()){
getDiskList().remove(node);
}
}
}
/**
* @return the number of pending messages
*/
public int size(){
return list.size();
synchronized(mutex){
return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size());
}
}
/**
@ -133,6 +218,66 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
*
*/
public void clear(){
list.clear();
synchronized(mutex){
memoryList.clear();
if(!isDiskListEmpty()){
getDiskList().clear();
}
}
}
public boolean isFull(){
// we always have space - as we can persist to disk
return false;
}
public void setUsageManager(UsageManager usageManager){
super.setUsageManager(usageManager);
usageManager.addUsageListener(this);
}
public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if(newPercentUsage>=100){
try{
if(iterLock.tryLock(500,TimeUnit.MILLISECONDS)){
flushToDisk();
iterLock.unlock();
}
}catch(InterruptedException e){
log.warn("caught an exception aquiring lock",e);
}
}
}
protected boolean isSpaceInMemoryList(){
return hasSpace()&&isDiskListEmpty();
}
protected void flushToDisk(){
synchronized(mutex){
for(Iterator i=memoryList.iterator();i.hasNext();){
MessageReference node=(MessageReference)i.next();
node.decrementReferenceCount();
getDiskList().addLast(node);
}
memoryList.clear();
}
}
protected boolean isDiskListEmpty(){
return diskList==null||diskList.isEmpty();
}
protected ListContainer getDiskList(){
if(diskList==null){
try{
diskList=store.getListContainer(name);
diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
diskList.setMaximumCacheSize(0);
}catch(IOException e){
throw new RuntimeException(e);
}
}
return diskList;
}
}

View File

@ -140,4 +140,9 @@ public interface PendingMessageCursor extends Service{
* @see org.apache.activemq.memory.UsageManager
*/
public void setUsageManager(UsageManager usageManager);
/**
* @return true if the cursor is full
*/
public boolean isFull();
}