diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index 7d37390aca..d0eb9e9965 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -421,11 +421,12 @@ public class ManagedRegionBroker extends RegionBroker { ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName()); TopicMessageStore store=adapter.createTopicMessageStore(topic); store.recover(new MessageRecoveryListener(){ - public void recoverMessage(Message message) throws Exception{ + public boolean recoverMessage(Message message) throws Exception{ result.add(message); + return true; } - public void recoverMessageReference(MessageId messageReference) throws Exception{ + public boolean recoverMessageReference(MessageId messageReference) throws Exception{ throw new RuntimeException("Should not be called."); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 94090248e3..4459500322 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -135,25 +135,29 @@ public class Queue implements Destination, Task { if(messages.isRecoveryRequired()){ store.recover(new MessageRecoveryListener(){ - public void recoverMessage(Message message){ + public boolean recoverMessage(Message message){ // Message could have expired while it was being loaded.. if(message.isExpired()){ broker.messageExpired(createConnectionContext(),message); destinationStatistics.getMessages().decrement(); - return; + return true; } - message.setRegionDestination(Queue.this); - synchronized(messages){ - try{ - messages.addMessageLast(message); - }catch(Exception e){ - log.fatal("Failed to add message to cursor",e); + if(hasSpace()){ + message.setRegionDestination(Queue.this); + synchronized(messages){ + try{ + messages.addMessageLast(message); + }catch(Exception e){ + log.fatal("Failed to add message to cursor",e); + } } + destinationStatistics.getMessages().increment(); + return true; } - destinationStatistics.getMessages().increment(); + return false; } - public void recoverMessageReference(MessageId messageReference) throws Exception{ + public boolean recoverMessageReference(MessageId messageReference) throws Exception{ throw new RuntimeException("Should not be called."); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 567e5fca26..9164f52d1c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -190,7 +190,7 @@ public class Topic implements Destination { msgContext.setDestination(destination); if(subscription.isRecoveryRequired()){ store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){ - public void recoverMessage(Message message) throws Exception{ + public boolean recoverMessage(Message message) throws Exception{ message.setRegionDestination(Topic.this); try{ msgContext.setMessageReference(message); @@ -203,9 +203,10 @@ public class Topic implements Destination { // TODO: Need to handle this better. e.printStackTrace(); } + return true; } - public void recoverMessageReference(MessageId messageReference) throws Exception{ + public boolean recoverMessageReference(MessageId messageReference) throws Exception{ throw new RuntimeException("Should not be called."); } @@ -426,11 +427,14 @@ public class Topic implements Destination { try{ if(store!=null){ store.recover(new MessageRecoveryListener(){ - public void recoverMessage(Message message) throws Exception{ + public boolean recoverMessage(Message message) throws Exception{ result.add(message); + return true; } - public void recoverMessageReference(MessageId messageReference) throws Exception{} + public boolean recoverMessageReference(MessageId messageReference) throws Exception{ + return true; + } public void finished(){} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index 28b1a32d43..e55baaef67 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -130,16 +130,17 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements public void finished(){ } - public void recoverMessage(Message message) throws Exception{ + public boolean recoverMessage(Message message) throws Exception{ message.setRegionDestination(regionDestination); message.incrementReferenceCount(); batchList.addLast(message); + return true; } - public void recoverMessageReference(MessageId messageReference) throws Exception { + public boolean recoverMessageReference(MessageId messageReference) throws Exception { Message msg=store.getMessage(messageReference); if(msg!=null){ - recoverMessage(msg); + return recoverMessage(msg); }else{ String err = "Failed to retrieve message for id: "+messageReference; log.error(err); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 9f58308e1d..a3bf8dd390 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -159,16 +159,17 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message public void finished(){ } - public synchronized void recoverMessage(Message message) throws Exception{ + public synchronized boolean recoverMessage(Message message) throws Exception{ message.setRegionDestination(regionDestination); // only increment if count is zero (could have been cached) if(message.getReferenceCount()==0){ message.incrementReferenceCount(); } batchList.addLast(message); + return true; } - public void recoverMessageReference(MessageId messageReference) throws Exception{ + public boolean recoverMessageReference(MessageId messageReference) throws Exception{ // shouldn't get called throw new RuntimeException("Not supported"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java b/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java index d4514c552c..c48f1fe219 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java @@ -24,8 +24,7 @@ import org.apache.activemq.command.MessageId; * @version $Revision: 1.4 $ */ public interface MessageRecoveryListener { - void recoverMessage(Message message) throws Exception; - void recoverMessageReference(MessageId ref) throws Exception; - void finished(); + boolean recoverMessage(Message message) throws Exception; + boolean recoverMessageReference(MessageId ref) throws Exception; boolean hasSpace(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java index b04124e4d4..3ade719575 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java @@ -34,27 +34,29 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener{ this.listener=listener; } - public void finished(){ - listener.finished(); - } - + public boolean hasSpace(){ return listener.hasSpace(); } - public void recoverMessage(Message message) throws Exception{ - listener.recoverMessage(message); - lastRecovered=message.getMessageId(); - count++; + public boolean recoverMessage(Message message) throws Exception{ + if(listener.hasSpace()){ + listener.recoverMessage(message); + lastRecovered=message.getMessageId(); + count++; + return true; + } + return false; } - public void recoverMessageReference(MessageId ref) throws Exception{ + public boolean recoverMessageReference(MessageId ref) throws Exception{ Message message=this.store.getMessage(ref); if(message!=null){ - recoverMessage(message); + return recoverMessage(message); }else{ log.error("Message id "+ref+" could not be recovered from the data store!"); } + return false; } MessageId getLastRecoveredMessageId() { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java index 9ba6adfc5b..c706f8a493 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java @@ -23,7 +23,6 @@ package org.apache.activemq.store.jdbc; * @version $Revision: 1.3 $ */ public interface JDBCMessageRecoveryListener { - void recoverMessage(long sequenceId, byte[] message) throws Exception; - void recoverMessageReference(String reference) throws Exception; - void finished(); + boolean recoverMessage(long sequenceId, byte[] message) throws Exception; + boolean recoverMessageReference(String reference) throws Exception; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index c866382561..64c64ee079 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -154,16 +154,13 @@ public class JDBCMessageStore implements MessageStore { try { c = persistenceAdapter.getTransactionContext(); adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { - public void recoverMessage(long sequenceId, byte[] data) throws Exception { + public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); - listener.recoverMessage(msg); + return listener.recoverMessage(msg); } - public void recoverMessageReference(String reference) throws Exception { - listener.recoverMessageReference(new MessageId(reference)); - } - public void finished(){ - listener.finished(); + public boolean recoverMessageReference(String reference) throws Exception { + return listener.recoverMessageReference(new MessageId(reference)); } }); } catch (SQLException e) { @@ -234,24 +231,25 @@ public class JDBCMessageStore implements MessageStore { adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned, new JDBCMessageRecoveryListener(){ - public void recoverMessage(long sequenceId,byte[] data) throws Exception{ + public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{ if(listener.hasSpace()){ Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); listener.recoverMessage(msg); lastMessageId.set(sequenceId); + return true; } + return false; } - public void recoverMessageReference(String reference) throws Exception{ + public boolean recoverMessageReference(String reference) throws Exception{ if(listener.hasSpace()) { listener.recoverMessageReference(new MessageId(reference)); + return true; } + return false; } - public void finished(){ - listener.finished(); - } }); }catch(SQLException e){ JDBCPersistenceAdapter.log("JDBC Failure: ",e); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index 9118ad85c1..5e4c7308ca 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -72,18 +72,15 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess try { adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() { - public void recoverMessage(long sequenceId, byte[] data) throws Exception { + public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); - listener.recoverMessage(msg); + return listener.recoverMessage(msg); } - public void recoverMessageReference(String reference) throws Exception { - listener.recoverMessageReference(new MessageId(reference)); + public boolean recoverMessageReference(String reference) throws Exception { + return listener.recoverMessageReference(new MessageId(reference)); } - public void finished(){ - listener.finished(); - } }); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ",e); @@ -108,22 +105,21 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned, new JDBCMessageRecoveryListener(){ - public void recoverMessage(long sequenceId,byte[] data) throws Exception{ + public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{ if(listener.hasSpace()){ Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); listener.recoverMessage(msg); finalLast.set(sequenceId); + return true; } + return false; } - public void recoverMessageReference(String reference) throws Exception{ - listener.recoverMessageReference(new MessageId(reference)); + public boolean recoverMessageReference(String reference) throws Exception{ + return listener.recoverMessageReference(new MessageId(reference)); } - public void finished(){ - listener.finished(); - } }); }catch(SQLException e){ JDBCPersistenceAdapter.log("JDBC Failure: ",e); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index c2f567c916..58ede47fcf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -297,17 +297,20 @@ public class DefaultJDBCAdapter implements JDBCAdapter{ rs=s.executeQuery(); if(statements.isUseExternalMessageReferences()){ while(rs.next()){ - listener.recoverMessageReference(rs.getString(2)); + if (!listener.recoverMessageReference(rs.getString(2))) { + break; + } } }else{ while(rs.next()){ - listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2)); + if(!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) { + break; + } } } }finally{ close(rs); close(s); - listener.finished(); } } @@ -350,17 +353,20 @@ public class DefaultJDBCAdapter implements JDBCAdapter{ rs=s.executeQuery(); if(statements.isUseExternalMessageReferences()){ while(rs.next()){ - listener.recoverMessageReference(rs.getString(2)); + if (!listener.recoverMessageReference(rs.getString(2))){ + break; + } } }else{ while(rs.next()){ - listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2)); + if (!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) { + break; + } } } }finally{ close(rs); close(s); - listener.finished(); } } @@ -379,19 +385,24 @@ public class DefaultJDBCAdapter implements JDBCAdapter{ int count=0; if(statements.isUseExternalMessageReferences()){ while(rs.next()&&count