Timothy A. Bish 2013-04-15 18:08:15 +00:00
parent e6c6951bed
commit 3bf9d0cceb
7 changed files with 783 additions and 104 deletions

View File

@ -54,9 +54,9 @@ import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination;
@ -85,7 +85,12 @@ import org.apache.activemq.store.kahadb.disk.util.Sequence;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.util.*; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -101,13 +106,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
UNMATCHED = new Buffer(new byte[]{}); UNMATCHED = new Buffer(new byte[]{});
} }
private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
static final int CLOSED_STATE = 1; static final int CLOSED_STATE = 1;
static final int OPEN_STATE = 2; static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1; static final long NOT_ACKED = -1;
static final int VERSION = 4; static final int VERSION = 5;
protected class Metadata { protected class Metadata {
protected Page<Metadata> page; protected Page<Metadata> page;
@ -116,7 +120,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected Location lastUpdate; protected Location lastUpdate;
protected Location firstInProgressTransactionLocation; protected Location firstInProgressTransactionLocation;
protected Location producerSequenceIdTrackerLocation = null; protected Location producerSequenceIdTrackerLocation = null;
protected Location ackMessageFileMapLocation = null;
protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
protected int version = VERSION; protected int version = VERSION;
public void read(DataInput is) throws IOException { public void read(DataInput is) throws IOException {
state = is.readInt(); state = is.readInt();
@ -144,6 +150,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} catch (EOFException expectedOnUpgrade) { } catch (EOFException expectedOnUpgrade) {
version=1; version=1;
} }
if (version >= 5 && is.readBoolean()) {
ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
} else {
ackMessageFileMapLocation = null;
}
LOG.info("KahaDB is version " + version); LOG.info("KahaDB is version " + version);
} }
@ -172,6 +183,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
os.writeBoolean(false); os.writeBoolean(false);
} }
os.writeInt(VERSION); os.writeInt(VERSION);
if (ackMessageFileMapLocation != null) {
os.writeBoolean(true);
LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
} else {
os.writeBoolean(false);
}
} }
} }
@ -452,6 +469,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return range; return range;
} }
@SuppressWarnings("rawtypes")
private void trackMaxAndMin(Location[] range, List<Operation> ops) { private void trackMaxAndMin(Location[] range, List<Operation> ops) {
Location t = ops.get(0).getLocation(); Location t = ops.get(0).getLocation();
if (range[0]==null || t.compareTo(range[0]) <= 0) { if (range[0]==null || t.compareTo(range[0]) <= 0) {
@ -473,6 +491,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>(); HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
@SuppressWarnings("rawtypes")
public void track(Operation operation) { public void track(Operation operation) {
if (location == null ) { if (location == null ) {
location = operation.getLocation(); location = operation.getLocation();
@ -543,9 +562,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Location producerAuditPosition = recoverProducerAudit(); Location producerAuditPosition = recoverProducerAudit();
Location ackMessageFileLocation = recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition(); Location lastIndoubtPosition = getRecoveryPosition();
Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition); Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
if (recoveryPosition != null) { if (recoveryPosition != null) {
int redoCounter = 0; int redoCounter = 0;
@ -631,6 +652,24 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} }
@SuppressWarnings("unchecked")
private Location recoverAckMessageFileMap() throws IOException {
if (metadata.ackMessageFileMapLocation != null) {
KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
try {
ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
return journal.getNextLocation(metadata.ackMessageFileMapLocation);
} catch (Exception e) {
LOG.warn("Cannot recover ackMessageFileMap", e);
return journal.getNextLocation(null);
}
} else {
// got no ackMessageFileMap stored so got to recreate via replay from start of the journal
return journal.getNextLocation(null);
}
}
protected void recoverIndex(Transaction tx) throws IOException { protected void recoverIndex(Transaction tx) throws IOException {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
// It is possible index updates got applied before the journal updates.. // It is possible index updates got applied before the journal updates..
@ -760,7 +799,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
undoCounter++; undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case? // TODO: do we need to modify the ack positions for the pub sub case?
} }
} else { } else {
throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected."); throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
} }
@ -947,6 +985,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
byte readByte = is.readByte(); byte readByte = is.readByte();
KahaEntryType type = KahaEntryType.valueOf(readByte); KahaEntryType type = KahaEntryType.valueOf(readByte);
if( type == null ) { if( type == null ) {
try {
is.close();
} catch (IOException e) {}
throw new IOException("Could not load journal record. Invalid location: "+location); throw new IOException("Could not load journal record. Invalid location: "+location);
} }
JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
@ -1023,6 +1064,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
processLocation(location); processLocation(location);
} }
@Override
public void visit(KahaAckMessageFileMapCommand command) throws IOException {
processLocation(location);
}
@Override @Override
public void visit(KahaTraceCommand command) { public void visit(KahaTraceCommand command) {
processLocation(location); processLocation(location);
@ -1223,20 +1269,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} else { } else {
// If the message ID as indexed, then the broker asked us to // If the message ID as indexed, then the broker asked us to
// store a DUP // store a DUP message. Bad BOY! Don't do it, and log a warning.
// message. Bad BOY! Don't do it, and log a warning.
LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId()); LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.messageIdIndex.put(tx, command.getMessageId(), previous);
sd.locationIndex.remove(tx, location); sd.locationIndex.remove(tx, location);
rollbackStatsOnDuplicate(command.getDestination()); rollbackStatsOnDuplicate(command.getDestination());
} }
} else { } else {
// restore the previous value.. Looks like this was a redo of a // restore the previous value.. Looks like this was a redo of a previously
// previously // added message. We don't want to assign it a new id as the other indexes would
// added message. We don't want to assign it a new id as the other
// indexes would
// be wrong.. // be wrong..
//
sd.locationIndex.put(tx, location, previous); sd.locationIndex.put(tx, location, previous);
} }
// record this id in any event, initial send or recovery // record this id in any event, initial send or recovery
@ -1276,6 +1318,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
byte priority = sd.orderIndex.lastGetPriority(); byte priority = sd.orderIndex.lastGetPriority();
sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
} }
MessageKeys keys = sd.orderIndex.get(tx, sequence);
if (keys != null) {
recordAckMessageReferenceLocation(ackLocation, keys.location);
}
// The following method handles deleting un-referenced messages. // The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, sequence); removeAckLocation(tx, sd, subscriptionKey, sequence);
} else if (LOG.isDebugEnabled()) { } else if (LOG.isDebugEnabled()) {
@ -1286,13 +1333,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.lastUpdate = ackLocation; metadata.lastUpdate = ackLocation;
} }
Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
if (referenceFileIds == null) { if (referenceFileIds == null) {
referenceFileIds = new HashSet<Integer>(); referenceFileIds = new HashSet<Integer>();
referenceFileIds.add(messageLocation.getDataFileId()); referenceFileIds.add(messageLocation.getDataFileId());
ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
} else { } else {
Integer id = Integer.valueOf(messageLocation.getDataFileId()); Integer id = Integer.valueOf(messageLocation.getDataFileId());
if (!referenceFileIds.contains(id)) { if (!referenceFileIds.contains(id)) {
@ -1325,6 +1371,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.ackPositions.clear(tx); sd.ackPositions.clear(tx);
sd.ackPositions.unload(tx); sd.ackPositions.unload(tx);
tx.free(sd.ackPositions.getHeadPageId()); tx.free(sd.ackPositions.getHeadPageId());
sd.subLocations.clear(tx);
sd.subLocations.unload(tx);
tx.free(sd.subLocations.getHeadPageId());
} }
String key = key(command.getDestination()); String key = key(command.getDestination());
@ -1339,6 +1389,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// If set then we are creating it.. otherwise we are destroying the sub // If set then we are creating it.. otherwise we are destroying the sub
if (command.hasSubscriptionInfo()) { if (command.hasSubscriptionInfo()) {
sd.subscriptions.put(tx, subscriptionKey, command); sd.subscriptions.put(tx, subscriptionKey, command);
sd.subLocations.put(tx, subscriptionKey, location);
long ackLocation=NOT_ACKED; long ackLocation=NOT_ACKED;
if (!command.getRetroactive()) { if (!command.getRetroactive()) {
ackLocation = sd.orderIndex.nextMessageId-1; ackLocation = sd.orderIndex.nextMessageId-1;
@ -1350,6 +1401,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} else { } else {
// delete the sub... // delete the sub...
sd.subscriptions.remove(tx, subscriptionKey); sd.subscriptions.remove(tx, subscriptionKey);
sd.subLocations.remove(tx, subscriptionKey);
sd.subscriptionAcks.remove(tx, subscriptionKey); sd.subscriptionAcks.remove(tx, subscriptionKey);
sd.subscriptionCache.remove(subscriptionKey); sd.subscriptionCache.remove(subscriptionKey);
removeAckLocationsForSub(tx, sd, subscriptionKey); removeAckLocationsForSub(tx, sd, subscriptionKey);
@ -1394,6 +1446,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.state = OPEN_STATE; metadata.state = OPEN_STATE;
metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
Location[] inProgressTxRange = getInProgressTxLocationRange(); Location[] inProgressTxRange = getInProgressTxLocationRange();
metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
tx.store(metadata.page, metadataMarshaller, true); tx.store(metadata.page, metadataMarshaller, true);
@ -1432,6 +1485,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} }
if (metadata.ackMessageFileMapLocation != null) {
int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
gcCandidateSet.remove(dataFileId);
if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet);
}
}
// Don't GC files referenced by in-progress tx // Don't GC files referenced by in-progress tx
if (inProgressTxRange[0] != null) { if (inProgressTxRange[0] != null) {
for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
@ -1488,6 +1549,45 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} }
}); });
// Durable Subscription
if (entry.getValue().subLocations != null) {
Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
while (iter.hasNext()) {
Entry<String, Location> subscription = iter.next();
int dataFileId = subscription.getValue().getDataFileId();
// Move subscription along if it has no outstanding messages that need ack'd
// and its in the last log file in the journal.
if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
final StoredDestination destination = entry.getValue();
final String subscriptionKey = subscription.getKey();
SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
// When pending is size one that is the next message Id meaning there
// are no pending messages currently.
if (pendingAcks == null || pendingAcks.size() <= 1) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
}
final KahaSubscriptionCommand kahaSub =
destination.subscriptions.get(tx, subscriptionKey);
destination.subLocations.put(
tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
// Skips the remove from candidates if we rewrote the subscription
// in order to prevent duplicate subscription commands on recover.
// If another subscription is on the same file and isn't rewritten
// than it will remove the file from the set.
continue;
}
}
gcCandidateSet.remove(dataFileId);
}
}
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
} }
@ -1501,7 +1601,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Iterator<Integer> candidates = gcCandidateSet.iterator(); Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) { while (candidates.hasNext()) {
Integer candidate = candidates.next(); Integer candidate = candidates.next();
Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate); Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
if (referencedFileIds != null) { if (referencedFileIds != null) {
for (Integer referencedFileId : referencedFileIds) { for (Integer referencedFileId : referencedFileIds) {
if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) { if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
@ -1511,7 +1611,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} }
if (gcCandidateSet.contains(candidate)) { if (gcCandidateSet.contains(candidate)) {
ackMessageFileMap.remove(candidate); metadata.ackMessageFileMap.remove(candidate);
} else { } else {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("not removing data file: " + candidate LOG.trace("not removing data file: " + candidate
@ -1537,6 +1637,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void run() { public void run() {
} }
}; };
private Location checkpointProducerAudit() throws IOException { private Location checkpointProducerAudit() throws IOException {
if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
@ -1556,6 +1657,35 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return metadata.producerSequenceIdTrackerLocation; return metadata.producerSequenceIdTrackerLocation;
} }
private Location checkpointAckMessageFileMap() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(baos);
oout.writeObject(metadata.ackMessageFileMap);
oout.flush();
oout.close();
// using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
try {
location.getLatch().await();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
return location;
}
private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
ByteSequence sequence = toByteSequence(subscription);
Location location = journal.write(sequence, nullCompletionCallback) ;
try {
location.getLatch().await();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
return location;
}
public HashSet<Integer> getJournalFilesBeingReplicated() { public HashSet<Integer> getJournalFilesBeingReplicated() {
return journalFilesBeingReplicated; return journalFilesBeingReplicated;
} }
@ -1566,13 +1696,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
class StoredSubscription {
SubscriptionInfo subscriptionInfo;
String lastAckId;
Location lastAckLocation;
Location cursor;
}
static class MessageKeys { static class MessageKeys {
final String messageId; final String messageId;
final Location location; final Location location;
@ -1677,6 +1800,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
BTreeIndex<String, LastAck> subscriptionAcks; BTreeIndex<String, LastAck> subscriptionAcks;
HashMap<String, MessageOrderCursor> subscriptionCursors; HashMap<String, MessageOrderCursor> subscriptionCursors;
ListIndex<String, SequenceSet> ackPositions; ListIndex<String, SequenceSet> ackPositions;
ListIndex<String, Location> subLocations;
// Transient data used to track which Messages are no longer needed. // Transient data used to track which Messages are no longer needed.
final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>(); final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
@ -1730,6 +1854,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Now move the pending messages to ack data into the store backed // Now move the pending messages to ack data into the store backed
// structure. // structure.
value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
value.ackPositions.load(tx);
for(String subscriptionKey : temp.keySet()) { for(String subscriptionKey : temp.keySet()) {
value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
} }
@ -1737,26 +1864,41 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
}); });
} }
if (metadata.version >= 5) {
value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
} else {
// upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
value.subLocations.load(tx);
}
});
}
} }
if (metadata.version >= 2) { if (metadata.version >= 2) {
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
} else { } else {
// upgrade // upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() { pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override @Override
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
value.orderIndex.lowPriorityIndex.load(tx); value.orderIndex.lowPriorityIndex.load(tx);
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
value.orderIndex.highPriorityIndex.load(tx); value.orderIndex.highPriorityIndex.load(tx);
} }
}); });
} }
return value; return value;
@ -1772,6 +1914,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
dataOut.writeLong(value.subscriptions.getPageId()); dataOut.writeLong(value.subscriptions.getPageId());
dataOut.writeLong(value.subscriptionAcks.getPageId()); dataOut.writeLong(value.subscriptionAcks.getPageId());
dataOut.writeLong(value.ackPositions.getHeadPageId()); dataOut.writeLong(value.ackPositions.getHeadPageId());
dataOut.writeLong(value.subLocations.getHeadPageId());
} else { } else {
dataOut.writeBoolean(false); dataOut.writeBoolean(false);
} }
@ -1840,6 +1983,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
} }
metadata.destinations.put(tx, key, rc); metadata.destinations.put(tx, key, rc);
} }
@ -1873,6 +2017,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
rc.ackPositions.load(tx); rc.ackPositions.load(tx);
rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
rc.subLocations.load(tx);
rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
if (metadata.version < 3) { if (metadata.version < 3) {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb;
import java.io.IOException; import java.io.IOException;
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
@ -53,8 +54,10 @@ public class Visitor {
public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException { public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException {
} }
public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException { public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException {
} }
public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException {
}
} }

View File

@ -30,6 +30,7 @@ enum KahaEntryType {
KAHA_REMOVE_DESTINATION_COMMAND = 6; KAHA_REMOVE_DESTINATION_COMMAND = 6;
KAHA_SUBSCRIPTION_COMMAND = 7; KAHA_SUBSCRIPTION_COMMAND = 7;
KAHA_PRODUCER_AUDIT_COMMAND = 8; KAHA_PRODUCER_AUDIT_COMMAND = 8;
KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9;
} }
message KahaTraceCommand { message KahaTraceCommand {
@ -40,7 +41,7 @@ message KahaTraceCommand {
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaTraceCommand>"; //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaTraceCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType"; //| option java_type_method = "KahaEntryType";
required string message = 1; required string message = 1;
} }
@ -48,7 +49,7 @@ message KahaAddMessageCommand {
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddMessageCommand>"; //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddMessageCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType"; //| option java_type_method = "KahaEntryType";
optional KahaTransactionInfo transaction_info=1; optional KahaTransactionInfo transaction_info=1;
required KahaDestination destination = 2; required KahaDestination destination = 2;
required string messageId = 3; required string messageId = 3;
@ -120,10 +121,22 @@ message KahaProducerAuditCommand {
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaProducerAuditCommand>"; //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaProducerAuditCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType"; //| option java_type_method = "KahaEntryType";
required bytes audit = 1; required bytes audit = 1;
} }
message KahaAckMessageFileMapCommand {
// We make use of the wonky comment style bellow because the following options
// are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
// In the ActiveMQ proto compiler, comments terminate with the pipe character: |
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAckMessageFileMapCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required bytes ackMessageFileMap = 1;
}
message KahaDestination { message KahaDestination {
enum DestinationType { enum DestinationType {
QUEUE = 0; QUEUE = 0;
@ -154,8 +167,8 @@ message KahaXATransactionId {
} }
message KahaLocation { message KahaLocation {
required int32 log_id = 1; required int32 log_id = 1;
required int32 offset = 2; required int32 offset = 2;
} }
// TODO things to ponder // TODO things to ponder

View File

@ -16,14 +16,13 @@
*/ */
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory; import static org.junit.Assert.assertEquals;
import org.apache.activemq.ActiveMQSession; import static org.junit.Assert.assertNotNull;
import org.apache.activemq.broker.BrokerService; import static org.junit.Assert.assertTrue;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import java.io.IOException;
import org.slf4j.Logger; import java.util.Collection;
import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit;
import org.junit.Test;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
@ -31,51 +30,95 @@ import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.Topic;
import static org.junit.Assert.assertEquals; import org.apache.activemq.ActiveMQConnectionFactory;
import static org.junit.Assert.assertNotNull; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ2832Test { public class AMQ2832Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class); private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class);
BrokerService broker = null; BrokerService broker = null;
private ActiveMQConnectionFactory cf;
private final Destination destination = new ActiveMQQueue("AMQ2832Test"); private final Destination destination = new ActiveMQQueue("AMQ2832Test");
private String connectionUri;
protected void startBroker(boolean delete) throws Exception { protected void startBroker() throws Exception {
doStartBroker(true, false);
}
protected void restartBroker() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
doStartBroker(false, false);
}
protected void recoverBroker() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
doStartBroker(false, true);
}
private void doStartBroker(boolean delete, boolean recover) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(delete); broker.setDeleteAllMessagesOnStartup(delete);
broker.setPersistent(true); broker.setPersistent(true);
broker.setUseJmx(false); broker.setUseJmx(true);
broker.addConnector("tcp://localhost:0"); broker.addConnector("tcp://localhost:0");
configurePersistence(broker, delete); configurePersistence(broker, recover);
connectionUri = "vm://localhost?create=false";
cf = new ActiveMQConnectionFactory(connectionUri);
broker.start(); broker.start();
LOG.info("Starting broker.."); LOG.info("Starting broker..");
} }
protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception { protected void configurePersistence(BrokerService brokerService, boolean recover) throws Exception {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
// ensure there are a bunch of data files but multiple entries in each // ensure there are a bunch of data files but multiple entries in each
adapter.setJournalMaxFileLength(1024 * 20); adapter.setJournalMaxFileLength(1024 * 20);
// speed up the test case, checkpoint an cleanup early and often // speed up the test case, checkpoint an cleanup early and often
adapter.setCheckpointInterval(500); adapter.setCheckpointInterval(5000);
adapter.setCleanupInterval(500); adapter.setCleanupInterval(5000);
if (!deleteAllOnStart) { if (recover) {
adapter.setForceRecoverIndex(true); adapter.setForceRecoverIndex(true);
} }
}
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
} }
@Test @Test
public void testAckRemovedMessageReplayedAfterRecovery() throws Exception { public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
startBroker(true); startBroker();
StagedConsumer consumer = new StagedConsumer(); StagedConsumer consumer = new StagedConsumer();
int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20); int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20);
@ -102,9 +145,9 @@ public class AMQ2832Test {
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();
startBroker(false); recoverBroker();
consumer = new StagedConsumer(); consumer = new StagedConsumer();
// need to force recovery? // need to force recovery?
Message msg = consumer.receive(1, 5); Message msg = consumer.receive(1, 5);
@ -115,10 +158,99 @@ public class AMQ2832Test {
msg = consumer.receive(1, 5); msg = consumer.receive(1, 5);
assertEquals("Only one messages left after recovery: " + msg, null, msg); assertEquals("Only one messages left after recovery: " + msg, null, msg);
consumer.close(); consumer.close();
} }
private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception { @Test
public void testAlternateLossScenario() throws Exception {
startBroker();
ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
ActiveMQQueue disposable = new ActiveMQQueue("MyDisposableQueue");
ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
// This ensure that data file 1 never goes away.
createInactiveDurableSub(topic);
assertEquals(1, getNumberOfJournalFiles());
// One Queue Message that will be acked in another data file.
produceMessages(queue, 1);
assertEquals(1, getNumberOfJournalFiles());
// Add some messages to consume space
produceMessages(disposable, 50);
int dataFilesCount = getNumberOfJournalFiles();
assertTrue(dataFilesCount > 1);
// Create an ack for the single message on this queue
drainQueue(queue);
// Add some more messages to consume space beyond tha data file with the ack
produceMessages(disposable, 50);
assertTrue(dataFilesCount < getNumberOfJournalFiles());
dataFilesCount = getNumberOfJournalFiles();
restartBroker();
// Clear out all queue data
broker.getAdminView().removeQueue(disposable.getQueueName());
// Once this becomes true our ack could be lost.
assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getNumberOfJournalFiles() <= 3;
}
}, TimeUnit.MINUTES.toMillis(3)));
// Recover and the Message should not be replayed but if the old MessageAck is lost
// then it could be.
recoverBroker();
assertTrue(drainQueue(queue) == 0);
}
private int getNumberOfJournalFiles() throws IOException {
Collection<DataFile> files =
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
int reality = 0;
for (DataFile file : files) {
if (file != null) {
reality++;
}
}
return reality;
}
private void createInactiveDurableSub(Topic topic) throws Exception {
Connection connection = cf.createConnection();
connection.setClientID("Inactive");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
consumer.close();
connection.close();
produceMessages(topic, 1);
}
private int drainQueue(Queue queue) throws Exception {
Connection connection = cf.createConnection();
connection.setClientID("Inactive");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
int count = 0;
while (consumer.receive(5000) != null) {
count++;
}
consumer.close();
connection.close();
return count;
}
private int produceMessages(Destination destination, int numToSend) throws Exception {
int sent = 0; int sent = 0;
Connection connection = new ActiveMQConnectionFactory( Connection connection = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
@ -133,10 +265,14 @@ public class AMQ2832Test {
} finally { } finally {
connection.close(); connection.close();
} }
return sent; return sent;
} }
private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
return produceMessages(destination, numToSend);
}
final String payload = new String(new byte[1024]); final String payload = new String(new byte[1024]);
private Message createMessage(Session session, int i) throws Exception { private Message createMessage(Session session, int i) throws Exception {

View File

@ -0,0 +1,358 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ4212Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class);
private BrokerService service;
private String connectionUri;
private ActiveMQConnectionFactory cf;
private final int MSG_COUNT = 256;
@Before
public void setUp() throws IOException, Exception {
createBroker(true, false);
}
public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception {
service = new BrokerService();
service.setBrokerName("InactiveSubTest");
service.setDeleteAllMessagesOnStartup(deleteAllMessages);
service.setAdvisorySupport(false);
service.setPersistent(true);
service.setUseJmx(true);
service.setKeepDurableSubsActive(false);
KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter();
File dataFile=new File("KahaDB");
pa.setDirectory(dataFile);
pa.setJournalMaxFileLength(10*1024);
pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5));
pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5));
pa.setForceRecoverIndex(recover);
service.setPersistenceAdapter(pa);
service.start();
service.waitUntilStarted();
connectionUri = "vm://InactiveSubTest?create=false";
cf = new ActiveMQConnectionFactory(connectionUri);
}
private void restartBroker() throws Exception {
stopBroker();
createBroker(false, false);
}
private void recoverBroker() throws Exception {
stopBroker();
createBroker(false, true);
}
@After
public void stopBroker() throws Exception {
if (service != null) {
service.stop();
service.waitUntilStopped();
service = null;
}
}
@Test
public void testDirableSubPrefetchRecovered() throws Exception {
ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
// Send to a Queue to create some journal files
sendMessages(queue);
LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
createInactiveDurableSub(topic);
assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
return subs != null && subs.length == 1 ? true : false;
}
}));
// Now send some more to the queue to create even more files.
sendMessages(queue);
LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
assertTrue(getNumberOfJournalFiles() > 1);
LOG.info("Restarting the broker.");
restartBroker();
LOG.info("Restarted the broker.");
LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
assertTrue(getNumberOfJournalFiles() > 1);
assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
return subs != null && subs.length == 1 ? true : false;
}
}));
// Clear out all queue data
service.getAdminView().removeQueue(queue.getQueueName());
assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getNumberOfJournalFiles() <= 2;
}
}, TimeUnit.MINUTES.toMillis(2)));
LOG.info("Sending {} Messages to the Topic.", MSG_COUNT);
// Send some messages to the inactive destination
sendMessages(topic);
LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT);
assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic));
LOG.info("Recovering the broker.");
recoverBroker();
LOG.info("Recovering the broker.");
assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
return subs != null && subs.length == 1 ? true : false;
}
}));
}
@Test
public void testDurableAcksNotDropped() throws Exception {
ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
// Create durable sub in first data file.
createInactiveDurableSub(topic);
assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
return subs != null && subs.length == 1 ? true : false;
}
}));
// Send to a Topic
sendMessages(topic, 1);
// Send to a Queue to create some journal files
sendMessages(queue);
LOG.info("Before consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
// Consume all the Messages leaving acks behind.
consumeDurableMessages(topic, 1);
LOG.info("After consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
// Now send some more to the queue to create even more files.
sendMessages(queue);
LOG.info("More Queued. There are currently [{}] journal log files.", getNumberOfJournalFiles());
assertTrue(getNumberOfJournalFiles() > 1);
LOG.info("Restarting the broker.");
restartBroker();
LOG.info("Restarted the broker.");
LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
assertTrue(getNumberOfJournalFiles() > 1);
assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
return subs != null && subs.length == 1 ? true : false;
}
}));
// Clear out all queue data
service.getAdminView().removeQueue(queue.getQueueName());
assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getNumberOfJournalFiles() <= 3;
}
}, TimeUnit.MINUTES.toMillis(3)));
// See if we receive any message they should all be acked.
tryConsumeExpectNone(topic);
LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
LOG.info("Recovering the broker.");
recoverBroker();
LOG.info("Recovering the broker.");
LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
return subs != null && subs.length == 1 ? true : false;
}
}));
// See if we receive any message they should all be acked.
tryConsumeExpectNone(topic);
assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getNumberOfJournalFiles() == 1;
}
}, TimeUnit.MINUTES.toMillis(1)));
}
private int getNumberOfJournalFiles() throws IOException {
Collection<DataFile> files =
((KahaDBPersistenceAdapter) service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
int reality = 0;
for (DataFile file : files) {
if (file != null) {
reality++;
}
}
return reality;
}
private void createInactiveDurableSub(Topic topic) throws Exception {
Connection connection = cf.createConnection();
connection.setClientID("Inactive");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
consumer.close();
connection.close();
}
private void consumeDurableMessages(Topic topic, int count) throws Exception {
Connection connection = cf.createConnection();
connection.setClientID("Inactive");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
connection.start();
for (int i = 0; i < count; ++i) {
if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) {
fail("should have received a message");
}
}
consumer.close();
connection.close();
}
private void tryConsumeExpectNone(Topic topic) throws Exception {
Connection connection = cf.createConnection();
connection.setClientID("Inactive");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
connection.start();
if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) {
fail("Should be no messages for this durable.");
}
consumer.close();
connection.close();
}
private int consumeFromInactiveDurableSub(Topic topic) throws Exception {
Connection connection = cf.createConnection();
connection.setClientID("Inactive");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
int count = 0;
while (consumer.receive(10000) != null) {
count++;
}
consumer.close();
connection.close();
return count;
}
private void sendMessages(Destination destination) throws Exception {
sendMessages(destination, MSG_COUNT);
}
private void sendMessages(Destination destination, int count) throws Exception {
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < count; ++i) {
TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + destination);
producer.send(message);
}
connection.close();
}
}

View File

@ -16,18 +16,27 @@
*/ */
package org.apache.activemq.store.kahadb; package org.apache.activemq.store.kahadb;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IOHelper;
import javax.jms.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.security.ProtectionDomain; import java.security.ProtectionDomain;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author chirino * @author chirino
*/ */
@ -43,36 +52,36 @@ public class KahaDBVersionTest extends TestCase {
} }
static final Logger LOG = LoggerFactory.getLogger(KahaDBVersionTest.class); static final Logger LOG = LoggerFactory.getLogger(KahaDBVersionTest.class);
final static File VERSION_1_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1"); final static File VERSION_1_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
final static File VERSION_2_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2"); final static File VERSION_2_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
final static File VERSION_3_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3"); final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
BrokerService broker = null; BrokerService broker = null;
protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception { protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.setUseJmx(false); broker.setUseJmx(false);
broker.setPersistenceAdapter(kaha); broker.setPersistenceAdapter(kaha);
broker.start(); broker.start();
return broker; return broker;
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
if (broker != null) { if (broker != null) {
broker.stop(); broker.stop();
} }
} }
public void XtestCreateStore() throws Exception { public void XtestCreateStore() throws Exception {
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersionX"); File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
IOHelper.deleteFile(dir); IOHelper.deleteFile(dir);
kaha.setDirectory(dir); kaha.setDirectory(dir);
kaha.setJournalMaxFileLength(1024*1024); kaha.setJournalMaxFileLength(1024 * 1024);
BrokerService broker = createBroker(kaha); BrokerService broker = createBroker(kaha);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection(); Connection connection = cf.createConnection();
connection.setClientID("test"); connection.setClientID("test");
connection.start(); connection.start();
@ -85,33 +94,37 @@ public class KahaDBVersionTest extends TestCase {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic"); Topic topic = session.createTopic("test.topic");
Queue queue = session.createQueue("test.queue"); Queue queue = session.createQueue("test.queue");
MessageConsumer consumer = session.createDurableSubscriber(topic,"test"); MessageConsumer consumer = session.createDurableSubscriber(topic, "test");
consumer.close(); consumer.close();
MessageProducer producer = session.createProducer(topic); MessageProducer producer = session.createProducer(topic);
producer.setPriority(9); producer.setPriority(9);
for (int i =0; i < numToSend; i++) { for (int i = 0; i < numToSend; i++) {
Message msg = session.createTextMessage("test message:"+i); Message msg = session.createTextMessage("test message:" + i);
producer.send(msg); producer.send(msg);
} }
LOG.info("sent " + numToSend +" to topic"); LOG.info("sent " + numToSend + " to topic");
producer = session.createProducer(queue); producer = session.createProducer(queue);
for (int i =0; i < numToSend; i++) { for (int i = 0; i < numToSend; i++) {
Message msg = session.createTextMessage("test message:"+i); Message msg = session.createTextMessage("test message:" + i);
producer.send(msg); producer.send(msg);
} }
LOG.info("sent " + numToSend +" to queue"); LOG.info("sent " + numToSend + " to queue");
} }
public void testVersion1Conversion() throws Exception{ public void testVersion1Conversion() throws Exception {
doConvertRestartCycle(VERSION_1_DB); doConvertRestartCycle(VERSION_1_DB);
} }
public void testVersion2Conversion() throws Exception{ public void testVersion2Conversion() throws Exception {
doConvertRestartCycle(VERSION_2_DB); doConvertRestartCycle(VERSION_2_DB);
} }
public void testVersion3Conversion() throws Exception{ public void testVersion3Conversion() throws Exception {
doConvertRestartCycle(VERSION_3_DB); doConvertRestartCycle(VERSION_3_DB);
}
public void testVersion4Conversion() throws Exception {
doConvertRestartCycle(VERSION_4_DB);
} }
public void doConvertRestartCycle(File existingStore) throws Exception { public void doConvertRestartCycle(File existingStore) throws Exception {
@ -145,7 +158,7 @@ public class KahaDBVersionTest extends TestCase {
for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) { for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
TextMessage msg = (TextMessage) queueConsumer.receive(10000); TextMessage msg = (TextMessage) queueConsumer.receive(10000);
count++; count++;
//System.err.println(msg.getText()); // System.err.println(msg.getText());
assertNotNull(msg); assertNotNull(msg);
} }
LOG.info("Consumed " + count + " from queue"); LOG.info("Consumed " + count + " from queue");
@ -154,12 +167,12 @@ public class KahaDBVersionTest extends TestCase {
for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) { for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
TextMessage msg = (TextMessage) topicConsumer.receive(10000); TextMessage msg = (TextMessage) topicConsumer.receive(10000);
count++; count++;
//System.err.println(msg.getText()); // System.err.println(msg.getText());
assertNotNull(msg); assertNotNull(msg);
} }
LOG.info("Consumed " + count + " from topic"); LOG.info("Consumed " + count + " from topic");
connection.close(); connection.close();
broker.stop(); broker.stop();
} }
} }

View File

@ -20,13 +20,16 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -44,8 +47,9 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
public int messageCount = 10000; public int messageCount = 10000;
private BrokerService broker; private BrokerService broker;
private ActiveMQTopic topic; private ActiveMQTopic topic;
private List<Throwable> exceptions = new ArrayList<Throwable>(); private final List<Throwable> exceptions = new ArrayList<Throwable>();
@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
connectionFactory.setWatchTopicAdvisories(false); connectionFactory.setWatchTopicAdvisories(false);
@ -68,6 +72,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
return suite(DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.class); return suite(DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.class);
} }
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
exceptions.clear(); exceptions.clear();
topic = (ActiveMQTopic) createDestination(); topic = (ActiveMQTopic) createDestination();
@ -75,6 +80,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
super.setUp(); super.setUp();
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
super.tearDown(); super.tearDown();
destroyBroker(); destroyBroker();
@ -128,6 +134,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
final CountDownLatch goOn = new CountDownLatch(1); final CountDownLatch goOn = new CountDownLatch(1);
Thread sendThread = new Thread() { Thread sendThread = new Thread() {
@Override
public void run() { public void run() {
try { try {
@ -208,10 +215,10 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
LOG.info("Store free page count: " + store.getPageFile().getFreePageCount()); LOG.info("Store free page count: " + store.getPageFile().getFreePageCount());
LOG.info("Store page in-use: " + (store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount())); LOG.info("Store page in-use: " + (store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount()));
assertTrue("no leak of pages, always use just 10", Wait.waitFor(new Wait.Condition() { assertTrue("no leak of pages, always use just 11", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 10 == store.getPageFile().getPageCount() - return 11 == store.getPageFile().getPageCount() -
store.getPageFile().getFreePageCount(); store.getPageFile().getFreePageCount();
} }
}, TimeUnit.SECONDS.toMillis(10))); }, TimeUnit.SECONDS.toMillis(10)));
@ -236,6 +243,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
Listener() { Listener() {
} }
@Override
public void onMessage(Message message) { public void onMessage(Message message) {
count++; count++;
if (id != null) { if (id != null) {