log which Persistence Adaptor we are using

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@480924 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-11-30 13:46:08 +00:00
parent a9d1f21c05
commit 415d46cf46
8 changed files with 67 additions and 14 deletions

View File

@ -385,7 +385,7 @@ public class BrokerService implements Service, Serializable {
startDestinations(); startDestinations();
addShutdownHook(); addShutdownHook();
log.info("Using Persistence Adaptor " + getPersistenceAdapter()); log.info("Using Persistence Adaptor: " + getPersistenceAdapter());
if (deleteAllMessagesOnStartup) { if (deleteAllMessagesOnStartup) {
deleteAllMessages(); deleteAllMessages();
} }

View File

@ -59,7 +59,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
} }
nonPersistent.start(); nonPersistent.start();
persistent.start(); persistent.start();
pendingCount=persistent.size(); pendingCount=persistent.size() + nonPersistent.size();
} }
public synchronized void stop() throws Exception{ public synchronized void stop() throws Exception{
@ -87,12 +87,28 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
} }
} }
} }
public void addMessageFirst(MessageReference node) throws Exception{
if(node!=null){
Message msg=node.getMessage();
if(started){
pendingCount++;
if(!msg.isPersistent()){
nonPersistent.addMessageFirst(node);
}
}
if(msg.isPersistent()){
persistent.addMessageFirst(node);
}
}
}
public void clear(){ public void clear(){
pendingCount=0; pendingCount=0;
} }
public synchronized boolean hasNext(){ public synchronized boolean hasNext(){
boolean result=pendingCount>0; boolean result=pendingCount>0;
if(result){ if(result){
try{ try{
@ -107,7 +123,8 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
} }
public synchronized MessageReference next(){ public synchronized MessageReference next(){
return currentCursor!=null?currentCursor.next():null; MessageReference result = currentCursor!=null?currentCursor.next():null;
return result;
} }
public synchronized void remove(){ public synchronized void remove(){
@ -118,6 +135,11 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
} }
public void remove(MessageReference node){ public void remove(MessageReference node){
if (!node.isPersistent()) {
nonPersistent.remove(node);
}else {
persistent.remove(node);
}
pendingCount--; pendingCount--;
} }

View File

@ -88,5 +88,9 @@ public class DataSourceSupport {
ds.setCreateDatabase("create"); ds.setCreateDatabase("create");
return ds; return ds;
} }
public String toString(){
return ""+dataSource;
}
} }

View File

@ -482,4 +482,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
protected DatabaseLocker createDatabaseLocker() throws IOException { protected DatabaseLocker createDatabaseLocker() throws IOException {
return new DefaultDatabaseLocker(getDataSource(), getStatements()); return new DefaultDatabaseLocker(getDataSource(), getStatements());
} }
public String toString(){
return "JDBCPersistenceAdaptor("+super.toString()+")";
}
} }

View File

@ -669,5 +669,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
} }
public String toString(){
return "JournalPersistenceAdapator(" + longTermPersistence + ")";
}
} }

View File

@ -270,4 +270,8 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
String name=dir.getAbsolutePath()+File.separator+"kahadb"; String name=dir.getAbsolutePath()+File.separator+"kahadb";
return name; return name;
} }
public String toString(){
return "KahaPersistenceAdapter(" + getStoreName() +")";
}
} }

View File

@ -50,12 +50,16 @@ public class MemoryMessageStore implements MessageStore{
} }
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
messageTable.put(message.getMessageId(),message); synchronized(messageTable){
messageTable.put(message.getMessageId(),message);
}
} }
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException{ throws IOException{
messageTable.put(messageId,messageRef); synchronized(messageTable){
messageTable.put(messageId,messageRef);
}
} }
public Message getMessage(MessageId identity) throws IOException{ public Message getMessage(MessageId identity) throws IOException{
@ -67,11 +71,16 @@ public class MemoryMessageStore implements MessageStore{
} }
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{ public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
messageTable.remove(ack.getLastMessageId()); removeMessage(ack.getLastMessageId());
} }
public void removeMessage(MessageId msgId) throws IOException{ public void removeMessage(MessageId msgId) throws IOException{
messageTable.remove(msgId); synchronized(messageTable){
messageTable.remove(msgId);
if(lastBatchId!=null&lastBatchId.equals(msgId)){
lastBatchId=null;
}
}
} }
public void recover(MessageRecoveryListener listener) throws Exception{ public void recover(MessageRecoveryListener listener) throws Exception{
@ -96,7 +105,9 @@ public class MemoryMessageStore implements MessageStore{
} }
public void removeAllMessages(ConnectionContext context) throws IOException{ public void removeAllMessages(ConnectionContext context) throws IOException{
messageTable.clear(); synchronized(messageTable){
messageTable.clear();
}
} }
public ActiveMQDestination getDestination(){ public ActiveMQDestination getDestination(){
@ -104,7 +115,9 @@ public class MemoryMessageStore implements MessageStore{
} }
public void delete(){ public void delete(){
messageTable.clear(); synchronized(messageTable){
messageTable.clear();
}
} }
/** /**
@ -117,18 +130,16 @@ public class MemoryMessageStore implements MessageStore{
return messageTable.size(); return messageTable.size();
} }
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
synchronized(messageTable){ synchronized(messageTable){
boolean pastLackBatch=lastBatchId==null; boolean pastLackBatch=lastBatchId==null;
int count = 0; int count=0;
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry)iter.next(); Map.Entry entry=(Entry)iter.next();
if(pastLackBatch){ if(pastLackBatch){
count++; count++;
Object msg=entry.getValue(); Object msg=entry.getValue();
lastBatchId = (MessageId)entry.getKey(); lastBatchId=(MessageId)entry.getKey();
if(msg.getClass()==String.class){ if(msg.getClass()==String.class){
listener.recoverMessageReference((String)msg); listener.recoverMessageReference((String)msg);
}else{ }else{
@ -143,6 +154,6 @@ public class MemoryMessageStore implements MessageStore{
} }
public void resetBatching(){ public void resetBatching(){
lastBatchId = null; lastBatchId=null;
} }
} }

View File

@ -154,4 +154,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
*/ */
public void setUsageManager(UsageManager usageManager) { public void setUsageManager(UsageManager usageManager) {
} }
public String toString(){
return "MemoryPersistenceAdapter";
}
} }