https://issues.apache.org/jira/browse/AMQ-3422 - producer audit needs string rollback, plus don't fail if we cannot recover producer audit

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1153125 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2011-08-02 13:46:07 +00:00
parent 44ef96e6d3
commit 14755a19d5
4 changed files with 81 additions and 7 deletions

View File

@ -190,6 +190,17 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
} }
} }
public void rollback(final String id) {
String seed = IdGenerator.getSeedFromId(id);
if (seed != null) {
BitArrayBin bab = map.get(seed);
if (bab != null) {
long index = IdGenerator.getSequenceFromId(id);
bab.setBit(index, false);
}
}
}
/** /**
* Check the message is in order * Check the message is in order
* @param msg * @param msg

View File

@ -510,12 +510,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
try { try {
ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
} catch (ClassNotFoundException cfe) {
IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
ioe.initCause(cfe);
throw ioe;
}
return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
} catch (Exception e) {
LOG.warn("Cannot recover message audit", e);
return journal.getNextLocation(null);
}
} else { } else {
// got no audit stored so got to recreate via replay from start of the journal // got no audit stored so got to recreate via replay from start of the journal
return journal.getNextLocation(null); return journal.getNextLocation(null);
@ -546,7 +545,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location); sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId); sd.messageIdIndex.remove(tx, keys.messageId);
metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId)); metadata.producerSequenceIdTracker.rollback(keys.messageId);
undoCounter++; undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case? // TODO: do we need to modify the ack positions for the pub sub case?
} }

View File

@ -128,7 +128,7 @@ public class IdGenerator {
if (id != null) { if (id != null) {
int index = id.lastIndexOf(':'); int index = id.lastIndexOf(':');
if (index > 0 && (index + 1) < id.length()) { if (index > 0 && (index + 1) < id.length()) {
result = id.substring(0, index + 1); result = id.substring(0, index);
} }
} }
return result; return result;

View File

@ -16,6 +16,10 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -119,4 +123,64 @@ public class ActiveMQMessageAuditTest extends TestCase {
assertFalse(audit.isDuplicate(id)); assertFalse(audit.isDuplicate(id));
} }
} }
public void testSerialization() throws Exception {
ActiveMQMessageAuditNoSync audit = new ActiveMQMessageAuditNoSync();
byte[] bytes = serialize(audit);
System.out.println(bytes.length);
audit = recover(bytes);
List<MessageReference> list = new ArrayList<MessageReference>();
for (int j = 0; j < 1000; j++) {
ProducerId pid = new ProducerId();
pid.setConnectionId("test");
pid.setSessionId(0);
pid.setValue(j);
System.out.println("producer " + j);
for (int i = 0; i < 1000; i++) {
MessageId id = new MessageId();
id.setProducerId(pid);
id.setProducerSequenceId(i);
ActiveMQMessage msg = new ActiveMQMessage();
msg.setMessageId(id);
list.add(msg);
assertFalse(audit.isDuplicate(msg.getMessageId().toString()));
if (i % 100 == 0) {
bytes = serialize(audit);
System.out.println(bytes.length);
audit = recover(bytes);
}
if (i % 250 == 0) {
for (MessageReference message : list) {
audit.rollback(message.getMessageId().toString());
}
list.clear();
bytes = serialize(audit);
System.out.println(bytes.length);
audit = recover(bytes);
}
}
}
}
protected byte[] serialize(ActiveMQMessageAuditNoSync audit) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(baos);
oout.writeObject(audit);
oout.flush();
return baos.toByteArray();
}
protected ActiveMQMessageAuditNoSync recover(byte[] bytes) throws Exception {
ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (ActiveMQMessageAuditNoSync)objectIn.readObject();
}
} }