Tidied up locking around cursor iterators

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@479156 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-11-25 17:58:41 +00:00
parent 24b005c3da
commit ec63977e81
8 changed files with 228 additions and 200 deletions

View File

@ -118,15 +118,19 @@ public class DurableTopicSubscription extends PrefetchSubscription {
} }
} }
if( !keepDurableSubsActive ) { if(!keepDurableSubsActive){
synchronized(pending) { synchronized(pending){
pending.reset(); try{
while(pending.hasNext()) { pending.reset();
MessageReference node = pending.next(); while(pending.hasNext()){
node.decrementReferenceCount(); MessageReference node=pending.next();
pending.remove(); node.decrementReferenceCount();
} pending.remove();
} }
}finally{
pending.release();
}
}
} }
prefetchExtension=0; prefetchExtension=0;
} }
@ -195,22 +199,24 @@ public class DurableTopicSubscription extends PrefetchSubscription {
/** /**
* Release any references that we are holding. * Release any references that we are holding.
*/ */
public void destroy() { public void destroy(){
synchronized(pending) { try{
pending.reset(); synchronized(pending){
while(pending.hasNext()) { pending.reset();
MessageReference node = pending.next(); while(pending.hasNext()){
node.decrementReferenceCount(); MessageReference node=pending.next();
} node.decrementReferenceCount();
pending.clear(); }
} }
}finally{
for (Iterator iter = dispatched.iterator(); iter.hasNext();) { pending.release();
MessageReference node = (MessageReference) iter.next(); pending.clear();
}
for(Iterator iter=dispatched.iterator();iter.hasNext();){
MessageReference node=(MessageReference)iter.next();
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
dispatched.clear(); dispatched.clear();
} }
} }

View File

@ -123,6 +123,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
public void add(MessageReference node) throws Exception{ public void add(MessageReference node) throws Exception{
try {
boolean pendingEmpty = false; boolean pendingEmpty = false;
synchronized(pending){ synchronized(pending){
pendingEmpty=pending.isEmpty(); pendingEmpty=pending.isEmpty();
@ -139,21 +140,30 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
pending.addMessageLast(node); pending.addMessageLast(node);
} }
} }
}catch(Throwable e) {
e.printStackTrace();
}
} }
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{
synchronized(pending){ synchronized(pending){
pending.reset(); try{
while(pending.hasNext()){ pending.reset();
MessageReference node=pending.next(); while(pending.hasNext()){
if(node.getMessageId().equals(mdn.getMessageId())){ MessageReference node=pending.next();
pending.remove(); if(node.getMessageId().equals(mdn.getMessageId())){
createMessageDispatch(node,node.getMessage()); pending.remove();
dispatched.addLast(node); createMessageDispatch(node,node.getMessage());
return; dispatched.addLast(node);
return;
}
} }
}finally{
pending.release();
} }
throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()+") was not in the pending list: "+pending); throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()
+") was not in the pending list: "+pending);
} }
} }
@ -387,6 +397,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
dispatch(node); dispatch(node);
} }
}finally{ }finally{
pending.release();
dispatching=false; dispatching=false;
} }
} }

View File

@ -545,54 +545,58 @@ public class Queue implements Destination, Task {
} }
} }
} }
synchronized (messages) { synchronized(messages){
messages.reset(); try{
while(messages.hasNext()) { messages.reset();
try { while(messages.hasNext()){
MessageReference r = messages.next(); try{
r.incrementReferenceCount(); MessageReference r=messages.next();
try { r.incrementReferenceCount();
Message m = r.getMessage(); try{
if (m != null) { Message m=r.getMessage();
l.add(m); if(m!=null){
l.add(m);
}
}finally{
r.decrementReferenceCount();
} }
} }catch(IOException e){
finally { log.error("caught an exception brwsing "+this,e);
r.decrementReferenceCount();
} }
} }
catch (IOException e) { }finally{
log.error("caught an exception brwsing " + this,e); messages.release();
}
} }
} }
return (Message[]) l.toArray(new Message[l.size()]); return (Message[]) l.toArray(new Message[l.size()]);
} }
public Message getMessage(String messageId) { public Message getMessage(String messageId){
synchronized (messages) { synchronized(messages){
messages.reset(); try{
while(messages.hasNext()) { messages.reset();
try { while(messages.hasNext()){
MessageReference r = messages.next(); try{
if (messageId.equals(r.getMessageId().toString())) { MessageReference r=messages.next();
r.incrementReferenceCount(); if(messageId.equals(r.getMessageId().toString())){
try { r.incrementReferenceCount();
Message m = r.getMessage(); try{
if (m != null) { Message m=r.getMessage();
return m; if(m!=null){
return m;
}
}finally{
r.decrementReferenceCount();
} }
break;
} }
finally { }catch(IOException e){
r.decrementReferenceCount(); log.error("got an exception retrieving message "+messageId);
}
break;
} }
} }
catch (IOException e) { }finally{
log.error("got an exception retrieving message " + messageId); messages.release();
}
} }
} }
return null; return null;
@ -868,13 +872,17 @@ public class Queue implements Destination, Task {
int count=0; int count=0;
result=new ArrayList(toPageIn); result=new ArrayList(toPageIn);
synchronized(messages){ synchronized(messages){
messages.reset(); try{
while(messages.hasNext()&&count<toPageIn){ messages.reset();
MessageReference node=messages.next(); while(messages.hasNext()&&count<toPageIn){
messages.remove(); MessageReference node=messages.next();
node=createMessageReference(node.getMessage()); messages.remove();
result.add(node); node=createMessageReference(node.getMessage());
count++; result.add(node);
count++;
}
}finally{
messages.release();
} }
} }
synchronized(pagedInMessages){ synchronized(pagedInMessages){

View File

@ -135,36 +135,42 @@ public class TopicSubscription extends AbstractSubscription{
* Discard any expired messages from the matched list. Called from a synchronized block. * Discard any expired messages from the matched list. Called from a synchronized block.
* @throws IOException * @throws IOException
*/ */
protected void removeExpiredMessages() throws IOException { protected void removeExpiredMessages() throws IOException{
matched.reset(); try{
while(matched.hasNext()) {
MessageReference node=matched.next();
if (node.isExpired()) {
matched.remove();
dispatched.incrementAndGet();
node.decrementReferenceCount();
break;
}
}
matched.release();
}
public void processMessageDispatchNotification(MessageDispatchNotification mdn){
synchronized(matchedListMutex){
matched.reset(); matched.reset();
while(matched.hasNext()) { while(matched.hasNext()){
MessageReference node=matched.next(); MessageReference node=matched.next();
if(node.getMessageId().equals(mdn.getMessageId())){ if(node.isExpired()){
matched.remove(); matched.remove();
dispatched.incrementAndGet(); dispatched.incrementAndGet();
node.decrementReferenceCount(); node.decrementReferenceCount();
break; break;
} }
} }
}finally{
matched.release(); matched.release();
} }
} }
public void processMessageDispatchNotification(MessageDispatchNotification mdn){
synchronized(matchedListMutex){
try{
matched.reset();
while(matched.hasNext()){
MessageReference node=matched.next();
if(node.getMessageId().equals(mdn.getMessageId())){
matched.remove();
dispatched.incrementAndGet();
node.decrementReferenceCount();
break;
}
}
}finally{
matched.release();
}
}
}
synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{ synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
// Handle the standard acknowledgment case. // Handle the standard acknowledgment case.
@ -335,21 +341,22 @@ public class TopicSubscription extends AbstractSubscription{
private void dispatchMatched() throws IOException{ private void dispatchMatched() throws IOException{
synchronized(matchedListMutex){ synchronized(matchedListMutex){
matched.reset(); try{
while(matched.hasNext()) { matched.reset();
MessageReference message=(MessageReference) matched.next(); while(matched.hasNext()){
matched.remove(); MessageReference message=(MessageReference)matched.next();
matched.remove();
// Message may have been sitting in the matched list a while // Message may have been sitting in the matched list a while
// waiting for the consumer to ak the message. // waiting for the consumer to ak the message.
if( message.isExpired() ) { if(message.isExpired()){
message.decrementReferenceCount(); message.decrementReferenceCount();
continue; // just drop it. continue; // just drop it.
} }
dispatch(message);
dispatch(message); }
}finally{
matched.release();
} }
matched.release();
} }
} }

View File

@ -110,4 +110,8 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{
public boolean isFull() { public boolean isFull() {
return usageManager != null ? usageManager.isFull() : false; return usageManager != null ? usageManager.isFull() : false;
} }
public void release(){
}
} }

View File

@ -18,7 +18,6 @@ import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -38,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision$ * @version $Revision$
*/ */
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{ public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{
static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class); static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
private Store store; private Store store;
private String name; private String name;
@ -45,8 +45,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private ListContainer diskList; private ListContainer diskList;
private Iterator iter=null; private Iterator iter=null;
private Destination regionDestination; private Destination regionDestination;
private Lock iterLock=new ReentrantLock(); private ReentrantLock iterLock=new ReentrantLock();
private Object mutex=new Object();
/** /**
* @param name * @param name
@ -60,10 +59,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/** /**
* @return true if there are no pending messages * @return true if there are no pending messages
*/ */
public boolean isEmpty(){ public synchronized boolean isEmpty(){
synchronized(mutex){ return memoryList.isEmpty()&&isDiskListEmpty();
return memoryList.isEmpty()&&isDiskListEmpty();
}
} }
/** /**
@ -71,9 +68,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* *
*/ */
public void reset(){ public void reset(){
iterLock.lock(); try{
synchronized(mutex){ iterLock.lockInterruptibly();
iter=isSpaceInMemoryList()?memoryList.iterator():diskList.listIterator(); iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator();
}catch(InterruptedException e){
log.warn("Failed to get lock ",e);
} }
} }
@ -81,7 +80,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
iterLock.unlock(); iterLock.unlock();
} }
public void destroy(){ public synchronized void destroy(){
for(Iterator i=memoryList.iterator();i.hasNext();){ for(Iterator i=memoryList.iterator();i.hasNext();){
Message node=(Message)i.next(); Message node=(Message)i.next();
node.decrementReferenceCount(); node.decrementReferenceCount();
@ -92,23 +91,21 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
} }
public LinkedList pageInList(int maxItems){ public synchronized LinkedList pageInList(int maxItems){
LinkedList result=new LinkedList(); LinkedList result=new LinkedList();
synchronized(mutex){ int count=0;
int count=0; for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){
for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){ result.add(i.next());
result.add(i.next()); count++;
}
if(count<maxItems&&!isDiskListEmpty()){
for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){
Message message=(Message)i.next();
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
result.add(message);
count++; count++;
} }
if(count<maxItems&&!isDiskListEmpty()){
for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){
Message message=(Message)i.next();
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
result.add(message);
count++;
}
}
} }
return result; return result;
} }
@ -118,20 +115,18 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* *
* @param node * @param node
*/ */
public void addMessageLast(MessageReference node){ public synchronized void addMessageLast(MessageReference node){
synchronized(mutex){ try{
try{ regionDestination=node.getMessage().getRegionDestination();
regionDestination=node.getMessage().getRegionDestination(); if(isSpaceInMemoryList()){
if(isSpaceInMemoryList()){ memoryList.add(node);
memoryList.add(node); }else{
}else{ flushToDisk();
flushToDisk(); node.decrementReferenceCount();
node.decrementReferenceCount(); getDiskList().addLast(node);
getDiskList().addLast(node);
}
}catch(IOException e){
throw new RuntimeException(e);
} }
}catch(IOException e){
throw new RuntimeException(e);
} }
} }
@ -140,93 +135,79 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* *
* @param node * @param node
*/ */
public void addMessageFirst(MessageReference node){ public synchronized void addMessageFirst(MessageReference node){
synchronized(mutex){ try{
try{ regionDestination=node.getMessage().getRegionDestination();
regionDestination=node.getMessage().getRegionDestination(); if(isSpaceInMemoryList()){
if(isSpaceInMemoryList()){ memoryList.addFirst(node);
memoryList.addFirst(node); }else{
}else{ flushToDisk();
flushToDisk(); node.decrementReferenceCount();
node.decrementReferenceCount(); getDiskList().addFirst(node);
getDiskList().addFirst(node);
}
}catch(IOException e){
throw new RuntimeException(e);
} }
}catch(IOException e){
throw new RuntimeException(e);
} }
} }
/** /**
* @return true if there pending messages to dispatch * @return true if there pending messages to dispatch
*/ */
public boolean hasNext(){ public synchronized boolean hasNext(){
synchronized(mutex){ return iter.hasNext();
return iter.hasNext();
}
} }
/** /**
* @return the next pending message * @return the next pending message
*/ */
public MessageReference next(){ public synchronized MessageReference next(){
synchronized(mutex){ Message message=(Message)iter.next();
Message message=(Message)iter.next(); if(!isDiskListEmpty()){
if(!isDiskListEmpty()){ // got from disk
// got from disk message.setRegionDestination(regionDestination);
message.setRegionDestination(regionDestination); message.incrementReferenceCount();
message.incrementReferenceCount();
}
return message;
} }
return message;
} }
/** /**
* remove the message at the cursor position * remove the message at the cursor position
* *
*/ */
public void remove(){ public synchronized void remove(){
synchronized(mutex){ iter.remove();
iter.remove();
}
} }
/** /**
* @param node * @param node
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/ */
public void remove(MessageReference node){ public synchronized void remove(MessageReference node){
synchronized(mutex){ memoryList.remove(node);
memoryList.remove(node); if(!isDiskListEmpty()){
if(!isDiskListEmpty()){ getDiskList().remove(node);
getDiskList().remove(node);
}
} }
} }
/** /**
* @return the number of pending messages * @return the number of pending messages
*/ */
public int size(){ public synchronized int size(){
synchronized(mutex){ return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size());
return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size());
}
} }
/** /**
* clear all pending messages * clear all pending messages
* *
*/ */
public void clear(){ public synchronized void clear(){
synchronized(mutex){ memoryList.clear();
memoryList.clear(); if(!isDiskListEmpty()){
if(!isDiskListEmpty()){ getDiskList().clear();
getDiskList().clear();
}
} }
} }
public boolean isFull(){ public synchronized boolean isFull(){
// we always have space - as we can persist to disk // we always have space - as we can persist to disk
return false; return false;
} }
@ -253,15 +234,13 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return hasSpace()&&isDiskListEmpty(); return hasSpace()&&isDiskListEmpty();
} }
protected void flushToDisk(){ protected synchronized void flushToDisk(){
synchronized(mutex){ for(Iterator i=memoryList.iterator();i.hasNext();){
for(Iterator i=memoryList.iterator();i.hasNext();){ MessageReference node=(MessageReference)i.next();
MessageReference node=(MessageReference)i.next(); node.decrementReferenceCount();
node.decrementReferenceCount(); getDiskList().addLast(node);
getDiskList().addLast(node);
}
memoryList.clear();
} }
memoryList.clear();
} }
protected boolean isDiskListEmpty(){ protected boolean isDiskListEmpty(){

View File

@ -54,6 +54,13 @@ public interface PendingMessageCursor extends Service{
* *
*/ */
public void reset(); public void reset();
/**
* hint to the cursor to release any locks it might have
* grabbed after a reset
*
*/
public void release();
/** /**
* add message to await dispatch * add message to await dispatch

View File

@ -185,12 +185,18 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
} }
public synchronized void reset(){ public synchronized void reset(){
nonPersistent.reset();
for(Iterator i=storePrefetches.iterator();i.hasNext();){ for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
tsp.reset(); tsp.reset();
} }
} }
public synchronized void release(){
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
tsp.release();
}
}
public int size(){ public int size(){
return pendingCount; return pendingCount;