git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@560783 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-07-29 19:01:29 +00:00
parent eeb8d5bafc
commit 3a5f48d277
17 changed files with 128 additions and 103 deletions

View File

@ -421,11 +421,12 @@ public class ManagedRegionBroker extends RegionBroker {
ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName());
TopicMessageStore store=adapter.createTopicMessageStore(topic);
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message) throws Exception{
public boolean recoverMessage(Message message) throws Exception{
result.add(message);
return true;
}
public void recoverMessageReference(MessageId messageReference) throws Exception{
public boolean recoverMessageReference(MessageId messageReference) throws Exception{
throw new RuntimeException("Should not be called.");
}

View File

@ -135,25 +135,29 @@ public class Queue implements Destination, Task {
if(messages.isRecoveryRequired()){
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message){
public boolean recoverMessage(Message message){
// Message could have expired while it was being loaded..
if(message.isExpired()){
broker.messageExpired(createConnectionContext(),message);
destinationStatistics.getMessages().decrement();
return;
return true;
}
message.setRegionDestination(Queue.this);
synchronized(messages){
try{
messages.addMessageLast(message);
}catch(Exception e){
log.fatal("Failed to add message to cursor",e);
if(hasSpace()){
message.setRegionDestination(Queue.this);
synchronized(messages){
try{
messages.addMessageLast(message);
}catch(Exception e){
log.fatal("Failed to add message to cursor",e);
}
}
destinationStatistics.getMessages().increment();
return true;
}
destinationStatistics.getMessages().increment();
return false;
}
public void recoverMessageReference(MessageId messageReference) throws Exception{
public boolean recoverMessageReference(MessageId messageReference) throws Exception{
throw new RuntimeException("Should not be called.");
}

View File

@ -190,7 +190,7 @@ public class Topic implements Destination {
msgContext.setDestination(destination);
if(subscription.isRecoveryRequired()){
store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){
public void recoverMessage(Message message) throws Exception{
public boolean recoverMessage(Message message) throws Exception{
message.setRegionDestination(Topic.this);
try{
msgContext.setMessageReference(message);
@ -203,9 +203,10 @@ public class Topic implements Destination {
// TODO: Need to handle this better.
e.printStackTrace();
}
return true;
}
public void recoverMessageReference(MessageId messageReference) throws Exception{
public boolean recoverMessageReference(MessageId messageReference) throws Exception{
throw new RuntimeException("Should not be called.");
}
@ -426,11 +427,14 @@ public class Topic implements Destination {
try{
if(store!=null){
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message) throws Exception{
public boolean recoverMessage(Message message) throws Exception{
result.add(message);
return true;
}
public void recoverMessageReference(MessageId messageReference) throws Exception{}
public boolean recoverMessageReference(MessageId messageReference) throws Exception{
return true;
}
public void finished(){}

View File

@ -130,16 +130,17 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
public void finished(){
}
public void recoverMessage(Message message) throws Exception{
public boolean recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
batchList.addLast(message);
return true;
}
public void recoverMessageReference(MessageId messageReference) throws Exception {
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
Message msg=store.getMessage(messageReference);
if(msg!=null){
recoverMessage(msg);
return recoverMessage(msg);
}else{
String err = "Failed to retrieve message for id: "+messageReference;
log.error(err);

View File

@ -159,16 +159,17 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
public void finished(){
}
public synchronized void recoverMessage(Message message) throws Exception{
public synchronized boolean recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
// only increment if count is zero (could have been cached)
if(message.getReferenceCount()==0){
message.incrementReferenceCount();
}
batchList.addLast(message);
return true;
}
public void recoverMessageReference(MessageId messageReference) throws Exception{
public boolean recoverMessageReference(MessageId messageReference) throws Exception{
// shouldn't get called
throw new RuntimeException("Not supported");
}

View File

@ -24,8 +24,7 @@ import org.apache.activemq.command.MessageId;
* @version $Revision: 1.4 $
*/
public interface MessageRecoveryListener {
void recoverMessage(Message message) throws Exception;
void recoverMessageReference(MessageId ref) throws Exception;
void finished();
boolean recoverMessage(Message message) throws Exception;
boolean recoverMessageReference(MessageId ref) throws Exception;
boolean hasSpace();
}

View File

@ -34,27 +34,29 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener{
this.listener=listener;
}
public void finished(){
listener.finished();
}
public boolean hasSpace(){
return listener.hasSpace();
}
public void recoverMessage(Message message) throws Exception{
listener.recoverMessage(message);
lastRecovered=message.getMessageId();
count++;
public boolean recoverMessage(Message message) throws Exception{
if(listener.hasSpace()){
listener.recoverMessage(message);
lastRecovered=message.getMessageId();
count++;
return true;
}
return false;
}
public void recoverMessageReference(MessageId ref) throws Exception{
public boolean recoverMessageReference(MessageId ref) throws Exception{
Message message=this.store.getMessage(ref);
if(message!=null){
recoverMessage(message);
return recoverMessage(message);
}else{
log.error("Message id "+ref+" could not be recovered from the data store!");
}
return false;
}
MessageId getLastRecoveredMessageId() {

View File

@ -23,7 +23,6 @@ package org.apache.activemq.store.jdbc;
* @version $Revision: 1.3 $
*/
public interface JDBCMessageRecoveryListener {
void recoverMessage(long sequenceId, byte[] message) throws Exception;
void recoverMessageReference(String reference) throws Exception;
void finished();
boolean recoverMessage(long sequenceId, byte[] message) throws Exception;
boolean recoverMessageReference(String reference) throws Exception;
}

View File

@ -154,16 +154,13 @@ public class JDBCMessageStore implements MessageStore {
try {
c = persistenceAdapter.getTransactionContext();
adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
return listener.recoverMessage(msg);
}
public void recoverMessageReference(String reference) throws Exception {
listener.recoverMessageReference(new MessageId(reference));
}
public void finished(){
listener.finished();
public boolean recoverMessageReference(String reference) throws Exception {
return listener.recoverMessageReference(new MessageId(reference));
}
});
} catch (SQLException e) {
@ -234,24 +231,25 @@ public class JDBCMessageStore implements MessageStore {
adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned,
new JDBCMessageRecoveryListener(){
public void recoverMessage(long sequenceId,byte[] data) throws Exception{
public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
if(listener.hasSpace()){
Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
lastMessageId.set(sequenceId);
return true;
}
return false;
}
public void recoverMessageReference(String reference) throws Exception{
public boolean recoverMessageReference(String reference) throws Exception{
if(listener.hasSpace()) {
listener.recoverMessageReference(new MessageId(reference));
return true;
}
return false;
}
public void finished(){
listener.finished();
}
});
}catch(SQLException e){
JDBCPersistenceAdapter.log("JDBC Failure: ",e);

View File

@ -72,18 +72,15 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
try {
adapter.doRecoverSubscription(c, destination, clientId, subscriptionName,
new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
return listener.recoverMessage(msg);
}
public void recoverMessageReference(String reference) throws Exception {
listener.recoverMessageReference(new MessageId(reference));
public boolean recoverMessageReference(String reference) throws Exception {
return listener.recoverMessageReference(new MessageId(reference));
}
public void finished(){
listener.finished();
}
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
@ -108,22 +105,21 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned,
new JDBCMessageRecoveryListener(){
public void recoverMessage(long sequenceId,byte[] data) throws Exception{
public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
if(listener.hasSpace()){
Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
finalLast.set(sequenceId);
return true;
}
return false;
}
public void recoverMessageReference(String reference) throws Exception{
listener.recoverMessageReference(new MessageId(reference));
public boolean recoverMessageReference(String reference) throws Exception{
return listener.recoverMessageReference(new MessageId(reference));
}
public void finished(){
listener.finished();
}
});
}catch(SQLException e){
JDBCPersistenceAdapter.log("JDBC Failure: ",e);

View File

@ -297,17 +297,20 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
rs=s.executeQuery();
if(statements.isUseExternalMessageReferences()){
while(rs.next()){
listener.recoverMessageReference(rs.getString(2));
if (!listener.recoverMessageReference(rs.getString(2))) {
break;
}
}
}else{
while(rs.next()){
listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
if(!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) {
break;
}
}
}
}finally{
close(rs);
close(s);
listener.finished();
}
}
@ -350,17 +353,20 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
rs=s.executeQuery();
if(statements.isUseExternalMessageReferences()){
while(rs.next()){
listener.recoverMessageReference(rs.getString(2));
if (!listener.recoverMessageReference(rs.getString(2))){
break;
}
}
}else{
while(rs.next()){
listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
if (!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) {
break;
}
}
}
}finally{
close(rs);
close(s);
listener.finished();
}
}
@ -379,19 +385,24 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
int count=0;
if(statements.isUseExternalMessageReferences()){
while(rs.next()&&count<maxReturned){
listener.recoverMessageReference(rs.getString(1));
count++;
if(listener.recoverMessageReference(rs.getString(1))){
count++;
}else{
break;
}
}
}else{
while(rs.next()&&count<maxReturned){
listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
count++;
if(listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))){
count++;
}else{
break;
}
}
}
}finally{
close(rs);
close(s);
listener.finished();
}
}
@ -657,7 +668,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
}
public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,
int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
PreparedStatement s=null;
ResultSet rs=null;
try{
@ -669,23 +681,27 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
int count=0;
if(statements.isUseExternalMessageReferences()){
while(rs.next()&&count<maxReturned){
listener.recoverMessageReference(rs.getString(1));
count++;
if(listener.recoverMessageReference(rs.getString(1))){
count++;
}else{
log.debug("Stopped recover next messages");
}
}
}else{
while(rs.next()&&count<maxReturned){
listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
count++;
if(listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))){
count++;
}else{
log.debug("Stopped recover next messages");
}
}
}
}catch(Exception e) {
}catch(Exception e){
e.printStackTrace();
}finally {
}finally{
close(rs);
close(s);
listener.finished();
}
}
/*
* Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String

View File

@ -66,8 +66,12 @@ public class KahaMessageStore implements MessageStore{
return result;
}
protected void recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{
listener.recoverMessage(msg);
protected boolean recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{
if(listener.hasSpace()){
listener.recoverMessage(msg);
return true;
}
return false;
}
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
@ -89,9 +93,10 @@ public class KahaMessageStore implements MessageStore{
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
Message msg=(Message)messageContainer.getValue(entry);
recoverMessage(listener,msg);
if(!recoverMessage(listener,msg)) {
break;
}
}
listener.finished();
}
public void start(){
@ -167,7 +172,6 @@ public class KahaMessageStore implements MessageStore{
entry=messageContainer.getNext(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
listener.finished();
}
/**

View File

@ -58,16 +58,21 @@ public class KahaReferenceStore implements ReferenceStore{
throw new RuntimeException("Use addMessageReference instead");
}
protected final void recoverReference(MessageRecoveryListener listener,ReferenceRecord record) throws Exception{
listener.recoverMessageReference(new MessageId(record.getMessageId()));
protected final boolean recoverReference(MessageRecoveryListener listener,ReferenceRecord record) throws Exception{
if (listener.hasSpace()) {
listener.recoverMessageReference(new MessageId(record.getMessageId()));
return true;
}
return false;
}
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord record=messageContainer.getValue(entry);
recoverReference(listener,record);
if (!recoverReference(listener,record)) {
break;
}
}
listener.finished();
}
public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
@ -95,7 +100,6 @@ public class KahaReferenceStore implements ReferenceStore{
entry=messageContainer.getNext(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
listener.finished();
}
public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)

View File

@ -148,11 +148,12 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
Message msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
recoverMessage(listener, msg);
if(!recoverMessage(listener,msg)){
break;
}
}
}
}
listener.finished();
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
@ -186,7 +187,6 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
}
listener.finished();
}
public void delete(){

View File

@ -226,12 +226,10 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
}
listener.finished();
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
@ -239,11 +237,12 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
ReferenceRecord msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
recoverReference(listener,msg);
if(!recoverReference(listener,msg)){
break;
}
}
}
}
listener.finished();
}
public synchronized void resetBatching(String clientId,String subscriptionName){

View File

@ -95,7 +95,6 @@ public class MemoryMessageStore implements MessageStore{
listener.recoverMessage((Message)msg);
}
}
listener.finished();
}
}
@ -150,7 +149,6 @@ public class MemoryMessageStore implements MessageStore{
pastLackBatch=entry.getKey().equals(lastBatchId);
}
}
listener.finished();
}
}

View File

@ -57,7 +57,6 @@ class MemoryTopicSub{
listener.recoverMessage((Message)msg);
}
}
listener.finished();
}
void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
@ -83,7 +82,7 @@ class MemoryTopicSub{
if(lastId!=null){
lastBatch=lastId;
}
listener.finished();
}
void resetBatching(){