Fixes AMQ-4529: leveldb store NPEs when you send to a composite destination.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1481013 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-10 14:03:31 +00:00
parent 2c06326506
commit d344aa7a42
2 changed files with 154 additions and 151 deletions

View File

@ -153,7 +153,7 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
MessageId copy = new MessageId(producerId, producerSequenceId);
copy.key = key;
copy.brokerSequenceId = brokerSequenceId;
copy.dataLocator = new AtomicReference<Object>(dataLocator != null ? dataLocator.get() : null);
copy.dataLocator = dataLocator;
copy.entryLocator = entryLocator;
copy.plistLocator = plistLocator;
return copy;

View File

@ -1200,154 +1200,8 @@ class LevelDBClient(store: LevelDBStore) {
def store(uows: Array[DelayableUOW]) {
retryUsingIndex {
log.appender { appender =>
var syncNeeded = false
index.write(new WriteOptions, max_index_write_latency) { batch =>
var write_message_total = 0L
var write_enqueue_total = 0L
uows.foreach { uow =>
uow.actions.foreach { case (msg, action) =>
val messageRecord = action.messageRecord
var log_info:LogInfo = null
var pos = -1L
var dataLocator:DataLocator = null
if (messageRecord != null && messageRecord.locator==null) {
val start = System.nanoTime()
val p = appender.append(LOG_DATA, messageRecord.data)
pos = p._1
log_info = p._2
dataLocator = DataLocator(pos, messageRecord.data.length)
messageRecord.locator = dataLocator
write_message_total += System.nanoTime() - start
}
action.dequeues.foreach { entry =>
val keyLocation = entry.id.getEntryLocator.asInstanceOf[EntryLocator]
val key = encodeEntryKey(ENTRY_PREFIX, keyLocation.qid, keyLocation.seq)
if( dataLocator==null ) {
dataLocator = entry.id.getDataLocator match {
case x:DataLocator => x
case x:MessageRecord => x.locator
case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator)
}
}
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.queueKey)
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setValueLocation(dataLocator.pos)
appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
batch.delete(key)
logRefDecrement(dataLocator.pos)
collectionDecrementSize(entry.queueKey)
}
action.enqueues.foreach { entry =>
if(dataLocator ==null ) {
dataLocator = entry.id.getDataLocator match {
case x:DataLocator => x
case x:MessageRecord => x.locator
case _ =>
throw new RuntimeException("Unexpected locator type")
}
}
val start = System.nanoTime()
val key = encodeEntryKey(ENTRY_PREFIX, entry.queueKey, entry.queueSeq)
assert(entry.id.getDataLocator()!=null)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.queueKey)
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setValueLocation(dataLocator.pos)
log_record.setValueLength(dataLocator.len)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setValueLocation(dataLocator.pos)
index_record.setValueLength(dataLocator.len)
batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
val log_data = encodeEntryRecord(log_record.freeze())
val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
appender.append(LOG_ADD_ENTRY, log_data)
batch.put(key, index_data)
for( key <- logRefKey(pos, log_info) ) {
logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
}
collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
write_enqueue_total += System.nanoTime() - start
}
action.xaAcks.foreach { entry:XaAckRecord =>
val ack = entry.ack
if( dataLocator==null ) {
dataLocator = ack.getLastMessageId.getDataLocator match {
case x:DataLocator => x
case x:MessageRecord => x.locator
case _ =>
throw new RuntimeException("Unexpected locator type")
}
}
println(dataLocator)
val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
val os = new DataByteArrayOutputStream()
os.writeLong(dataLocator.pos)
os.writeInt(dataLocator.len)
os.writeLong(el.qid)
os.writeLong(el.seq)
os.writeLong(entry.sub)
store.wireFormat.marshal(ack, os)
var ack_encoded = os.toBuffer
val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.container)
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setMeta(ack_encoded)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setMeta(ack_encoded)
batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
}
}
uow.subAcks.foreach { entry =>
val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.subKey)
log_record.setEntryKey(ACK_POSITION)
log_record.setValueLocation(entry.ackPosition)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setValueLocation(entry.ackPosition)
batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
}
if( !syncNeeded && uow.syncNeeded ) {
syncNeeded = true
}
}
max_write_message_latency.add(write_message_total)
max_write_enqueue_latency.add(write_enqueue_total)
val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>
write_uows(uows, appender, batch)
}
if( syncNeeded && sync ) {
appender.force
@ -1355,8 +1209,8 @@ class LevelDBClient(store: LevelDBStore) {
} // end of log.appender { block }
// now that data is logged.. locate message from the data in the logs
uows.foreach { uow =>
uow.actions.foreach { case (msg, action) =>
for( uow <- uows ) {
for((msg, action) <- uow.actions ){
val messageRecord = action.messageRecord
if (messageRecord != null) {
messageRecord.id.setDataLocator(messageRecord.locator)
@ -1366,6 +1220,155 @@ class LevelDBClient(store: LevelDBStore) {
}
}
def write_uows(uows: Array[DelayableUOW], appender: RecordLog#LogAppender, batch: WriteBatch) = {
var syncNeeded = false
var write_message_total = 0L
var write_enqueue_total = 0L
for( uow <- uows ) {
for( (msg, action) <- uow.actions ) {
val messageRecord = action.messageRecord
var log_info: LogInfo = null
var dataLocator: DataLocator = null
if (messageRecord != null && messageRecord.locator == null) {
val start = System.nanoTime()
val p = appender.append(LOG_DATA, messageRecord.data)
log_info = p._2
dataLocator = DataLocator(p._1, messageRecord.data.length)
messageRecord.locator = dataLocator
// println("msg: "+messageRecord.id+" -> "+dataLocator)
write_message_total += System.nanoTime() - start
}
for( entry <- action.dequeues) {
val keyLocation = entry.id.getEntryLocator.asInstanceOf[EntryLocator]
val key = encodeEntryKey(ENTRY_PREFIX, keyLocation.qid, keyLocation.seq)
if (dataLocator == null) {
dataLocator = entry.id.getDataLocator match {
case x: DataLocator => x
case x: MessageRecord => x.locator
case _ => throw new RuntimeException("Unexpected locator type: " + dataLocator)
}
}
// println("deq: "+entry.id+" -> "+dataLocator)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.queueKey)
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setValueLocation(dataLocator.pos)
appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
batch.delete(key)
logRefDecrement(dataLocator.pos)
collectionDecrementSize(entry.queueKey)
}
for( entry<- action.enqueues) {
if (dataLocator == null) {
dataLocator = entry.id.getDataLocator match {
case x: DataLocator => x
case x: MessageRecord => x.locator
case _ =>
throw new RuntimeException("Unexpected locator type")
}
}
// println("enq: "+entry.id+" -> "+dataLocator)
val start = System.nanoTime()
val key = encodeEntryKey(ENTRY_PREFIX, entry.queueKey, entry.queueSeq)
assert(entry.id.getDataLocator() != null)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.queueKey)
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setValueLocation(dataLocator.pos)
log_record.setValueLength(dataLocator.len)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setValueLocation(dataLocator.pos)
index_record.setValueLength(dataLocator.len)
batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
val log_data = encodeEntryRecord(log_record.freeze())
val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
appender.append(LOG_ADD_ENTRY, log_data)
batch.put(key, index_data)
for (key <- logRefKey(dataLocator.pos, log_info)) {
logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
}
collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
write_enqueue_total += System.nanoTime() - start
}
for( entry <- action.xaAcks ) {
val ack = entry.ack
if (dataLocator == null) {
dataLocator = ack.getLastMessageId.getDataLocator match {
case x: DataLocator => x
case x: MessageRecord => x.locator
case _ =>
throw new RuntimeException("Unexpected locator type")
}
}
println(dataLocator)
val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
val os = new DataByteArrayOutputStream()
os.writeLong(dataLocator.pos)
os.writeInt(dataLocator.len)
os.writeLong(el.qid)
os.writeLong(el.seq)
os.writeLong(entry.sub)
store.wireFormat.marshal(ack, os)
var ack_encoded = os.toBuffer
val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.container)
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setMeta(ack_encoded)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setMeta(ack_encoded)
batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
}
}
for( entry <- uow.subAcks ) {
val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.subKey)
log_record.setEntryKey(ACK_POSITION)
log_record.setValueLocation(entry.ackPosition)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setValueLocation(entry.ackPosition)
batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
}
if (uow.syncNeeded) {
syncNeeded = true
}
}
max_write_message_latency.add(write_message_total)
max_write_enqueue_latency.add(write_enqueue_total)
syncNeeded
}
def getCollectionEntries(collectionKey: Long, firstSeq:Long, lastSeq:Long): Seq[(Buffer, EntryRecord.Buffer)] = {
var rc = ListBuffer[(Buffer, EntryRecord.Buffer)]()
val ro = new ReadOptions