make sure next entry will be set properly in recovery

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@504032 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-02-06 08:40:14 +00:00
parent 2e0f676c9b
commit 5c7af47332
2 changed files with 63 additions and 70 deletions

View File

@ -36,9 +36,9 @@ public class KahaMessageStore implements MessageStore{
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
protected final MapContainer<MessageId,Message> messageContainer; protected final MapContainer<MessageId,Message> messageContainer;
protected StoreEntry batchEntry=null; protected StoreEntry batchEntry=null;
public KahaMessageStore(MapContainer<MessageId,Message> container,ActiveMQDestination destination) throws IOException{ public KahaMessageStore(MapContainer<MessageId,Message> container,ActiveMQDestination destination)
throws IOException{
this.messageContainer=container; this.messageContainer=container;
this.destination=destination; this.destination=destination;
} }
@ -110,7 +110,7 @@ public class KahaMessageStore implements MessageStore{
/** /**
* @param usageManager The UsageManager that is controlling the destination's memory usage. * @param usageManager The UsageManager that is controlling the destination's memory usage.
*/ */
public void setUsageManager(UsageManager usageManager) { public void setUsageManager(UsageManager usageManager){
} }
/** /**
@ -146,6 +146,9 @@ public class KahaMessageStore implements MessageStore{
}else{ }else{
entry=messageContainer.refresh(entry); entry=messageContainer.refresh(entry);
entry=messageContainer.getNext(entry); entry=messageContainer.getNext(entry);
if(entry==null){
batchEntry=null;
}
} }
if(entry!=null){ if(entry!=null){
int count=0; int count=0;
@ -176,6 +179,4 @@ public class KahaMessageStore implements MessageStore{
public boolean isSupportForCursors(){ public boolean isSupportForCursors(){
return true; return true;
} }
} }

View File

@ -1,20 +1,17 @@
/** /**
* *
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.activemq.store.kahadaptor; package org.apache.activemq.store.kahadaptor;
import java.io.IOException; import java.io.IOException;
@ -30,46 +27,40 @@ import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore; import org.apache.activemq.store.ReferenceStore;
public class KahaReferenceStore implements ReferenceStore { public class KahaReferenceStore implements ReferenceStore{
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
protected final MapContainer<MessageId,ReferenceRecord> messageContainer; protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
protected StoreEntry batchEntry=null; protected StoreEntry batchEntry=null;
public KahaReferenceStore(MapContainer container, ActiveMQDestination destination) throws IOException { public KahaReferenceStore(MapContainer container,ActiveMQDestination destination) throws IOException{
this.messageContainer=container; this.messageContainer=container;
this.destination=destination; this.destination=destination;
}
}
public void start(){ public void start(){
} }
public void stop(){ public void stop(){
} }
protected MessageId getMessageId(Object object){
protected MessageId getMessageId(Object object) { return new MessageId(((ReferenceRecord)object).messageId);
return new MessageId(((ReferenceRecord)object).messageId); }
}
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
throw new RuntimeException("Use addMessageReference instead");
}
public synchronized Message getMessage(MessageId identity) throws IOException{
throw new RuntimeException("Use addMessageReference instead");
}
protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
ReferenceRecord record=(ReferenceRecord)msg;
listener.recoverMessageReference(new MessageId(record.messageId));
}
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
public synchronized Message getMessage(MessageId identity) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
ReferenceRecord record = (ReferenceRecord) msg;
listener.recoverMessageReference(new MessageId(record.messageId));
}
public synchronized void recover(MessageRecoveryListener listener) throws Exception{ public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord record=messageContainer.getValue(entry); ReferenceRecord record=messageContainer.getValue(entry);
@ -77,7 +68,7 @@ public class KahaReferenceStore implements ReferenceStore {
} }
listener.finished(); listener.finished();
} }
public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
StoreEntry entry=batchEntry; StoreEntry entry=batchEntry;
if(entry==null){ if(entry==null){
@ -85,6 +76,9 @@ public class KahaReferenceStore implements ReferenceStore {
}else{ }else{
entry=messageContainer.refresh(entry); entry=messageContainer.refresh(entry);
entry=messageContainer.getNext(entry); entry=messageContainer.getNext(entry);
if (entry==null) {
batchEntry=null;
}
} }
if(entry!=null){ if(entry!=null){
int count=0; int count=0;
@ -100,38 +94,38 @@ public class KahaReferenceStore implements ReferenceStore {
} }
listener.finished(); listener.finished();
} }
public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException { public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); throws IOException{
ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
messageContainer.put(messageId,record); messageContainer.put(messageId,record);
} }
public ReferenceData getMessageReference(MessageId identity) throws IOException { public ReferenceData getMessageReference(MessageId identity) throws IOException{
ReferenceRecord result=messageContainer.get(identity);
ReferenceRecord result=messageContainer.get(identity); if(result==null)
if( result == null ) return null;
return null;
return result.data; return result.data;
} }
public void addReferenceFileIdsInUse(Set<Integer> rc) { public void addReferenceFileIdsInUse(Set<Integer> rc){
for (StoreEntry entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) { for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry); ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
rc.add(msg.data.getFileId()); rc.add(msg.data.getFileId());
} }
} }
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{ public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
removeMessage(ack.getLastMessageId()); removeMessage(ack.getLastMessageId());
} }
public synchronized void removeMessage(MessageId msgId) throws IOException{ public synchronized void removeMessage(MessageId msgId) throws IOException{
messageContainer.remove(msgId); messageContainer.remove(msgId);
if(messageContainer.isEmpty()){ if(messageContainer.isEmpty()){
resetBatching(); resetBatching();
} }
} }
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{ public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear(); messageContainer.clear();
} }
@ -143,21 +137,19 @@ public class KahaReferenceStore implements ReferenceStore {
public synchronized void delete(){ public synchronized void delete(){
messageContainer.clear(); messageContainer.clear();
} }
public void resetBatching(){ public void resetBatching(){
batchEntry=null; batchEntry=null;
} }
public int getMessageCount(){ public int getMessageCount(){
return messageContainer.size(); return messageContainer.size();
} }
public void setUsageManager(UsageManager usageManager) { public void setUsageManager(UsageManager usageManager){
} }
public boolean isSupportForCursors(){ public boolean isSupportForCursors(){
return true; return true;
} }
} }