diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java index 3fb95f5a7f..6c6874374c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java @@ -189,6 +189,17 @@ public class ActiveMQMessageAuditNoSync implements Serializable { } } } + + public void rollback(final String id) { + String seed = IdGenerator.getSeedFromId(id); + if (seed != null) { + BitArrayBin bab = map.get(seed); + if (bab != null) { + long index = IdGenerator.getSequenceFromId(id); + bab.setBit(index, false); + } + } + } /** * Check the message is in order diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index d9c2f87a9c..65d491a03a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -510,12 +510,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar try { ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); - } catch (ClassNotFoundException cfe) { - IOException ioe = new IOException("Failed to read producerAudit: " + cfe); - ioe.initCause(cfe); - throw ioe; + return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); + } catch (Exception e) { + LOG.warn("Cannot recover message audit", e); + return journal.getNextLocation(null); } - return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); } else { // got no audit stored so got to recreate via replay from start of the journal return journal.getNextLocation(null); @@ -546,7 +545,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); sd.locationIndex.remove(tx, keys.location); sd.messageIdIndex.remove(tx, keys.messageId); - metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId)); + metadata.producerSequenceIdTracker.rollback(keys.messageId); undoCounter++; // TODO: do we need to modify the ack positions for the pub sub case? } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java b/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java index f7199c75a7..34345665c2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java @@ -128,7 +128,7 @@ public class IdGenerator { if (id != null) { int index = id.lastIndexOf(':'); if (index > 0 && (index + 1) < id.length()) { - result = id.substring(0, index + 1); + result = id.substring(0, index); } } return result; diff --git a/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java b/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java index cd75103ca8..59a3ffeb69 100755 --- a/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java @@ -16,6 +16,10 @@ */ package org.apache.activemq; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.List; import junit.framework.TestCase; @@ -119,4 +123,64 @@ public class ActiveMQMessageAuditTest extends TestCase { assertFalse(audit.isDuplicate(id)); } } + + public void testSerialization() throws Exception { + ActiveMQMessageAuditNoSync audit = new ActiveMQMessageAuditNoSync(); + + byte[] bytes = serialize(audit); + System.out.println(bytes.length); + audit = recover(bytes); + + List list = new ArrayList(); + + + for (int j = 0; j < 1000; j++) { + ProducerId pid = new ProducerId(); + pid.setConnectionId("test"); + pid.setSessionId(0); + pid.setValue(j); + System.out.println("producer " + j); + + for (int i = 0; i < 1000; i++) { + MessageId id = new MessageId(); + id.setProducerId(pid); + id.setProducerSequenceId(i); + ActiveMQMessage msg = new ActiveMQMessage(); + msg.setMessageId(id); + list.add(msg); + assertFalse(audit.isDuplicate(msg.getMessageId().toString())); + + if (i % 100 == 0) { + bytes = serialize(audit); + System.out.println(bytes.length); + audit = recover(bytes); + } + + if (i % 250 == 0) { + for (MessageReference message : list) { + audit.rollback(message.getMessageId().toString()); + } + list.clear(); + bytes = serialize(audit); + System.out.println(bytes.length); + audit = recover(bytes); + } + + } + } + + } + + protected byte[] serialize(ActiveMQMessageAuditNoSync audit) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oout = new ObjectOutputStream(baos); + oout.writeObject(audit); + oout.flush(); + return baos.toByteArray(); + } + + protected ActiveMQMessageAuditNoSync recover(byte[] bytes) throws Exception { + ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(bytes)); + return (ActiveMQMessageAuditNoSync)objectIn.readObject(); + } }