mirror of https://github.com/apache/activemq.git
Fixes AMQ-4251: Scala compile warnings - Compiling activemq-leveldb-store
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1480811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
540b1c6a89
commit
2c06326506
|
@ -34,10 +34,11 @@ import org.apache.activemq.util.ByteSequence
|
|||
import util.TimeMetric
|
||||
import scala.Some
|
||||
|
||||
case class EntryLocator(qid:Long, seq:Long)
|
||||
case class DataLocator(pos:Long, len:Int)
|
||||
case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
|
||||
var locator:(Long, Int) = _
|
||||
var locator:DataLocator = _
|
||||
}
|
||||
|
||||
case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long)
|
||||
case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
|
||||
case class QueueEntryRange()
|
||||
|
@ -266,13 +267,13 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
|
|||
record
|
||||
case record:MessageRecord =>
|
||||
record
|
||||
case x:(Long, Int) =>
|
||||
case x:DataLocator =>
|
||||
null
|
||||
}
|
||||
|
||||
val entry = QueueEntryRecord(id, queueKey, queueSeq)
|
||||
assert(id.getEntryLocator == null)
|
||||
id.setEntryLocator((queueKey, queueSeq))
|
||||
id.setEntryLocator(EntryLocator(queueKey, queueSeq))
|
||||
|
||||
val a = this.synchronized {
|
||||
if( !delay )
|
||||
|
@ -293,7 +294,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
|
|||
}
|
||||
|
||||
def dequeue(expectedQueueKey:Long, id:MessageId) = {
|
||||
val (queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[(Long, Long)];
|
||||
val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
|
||||
assert(queueKey == expectedQueueKey)
|
||||
val entry = QueueEntryRecord(id, queueKey, queueSeq)
|
||||
this.synchronized {
|
||||
|
@ -670,7 +671,7 @@ class DBManager(val parent:LevelDBStore) {
|
|||
}
|
||||
|
||||
def queuePosition(id: MessageId):Long = {
|
||||
id.getEntryLocator.asInstanceOf[(Long, Long)]._2
|
||||
id.getEntryLocator.asInstanceOf[EntryLocator].seq
|
||||
}
|
||||
|
||||
def createQueueStore(dest:ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
|
||||
|
|
|
@ -1094,9 +1094,9 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
|
||||
collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
|
||||
val seq = decodeLong(key)
|
||||
var locator = (value.getValueLocation, value.getValueLength)
|
||||
var locator = DataLocator(value.getValueLocation, value.getValueLength)
|
||||
val msg = getMessage(locator)
|
||||
msg.getMessageId().setEntryLocator((collectionKey, seq))
|
||||
msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
|
||||
msg.getMessageId().setDataLocator(locator)
|
||||
func(msg)
|
||||
}
|
||||
|
@ -1114,14 +1114,14 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
val seq = is.readLong()
|
||||
val sub = is.readLong()
|
||||
val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck]
|
||||
ack.getLastMessageId.setDataLocator((log, offset))
|
||||
ack.getLastMessageId.setEntryLocator((qid, seq))
|
||||
ack.getLastMessageId.setDataLocator(DataLocator(log, offset))
|
||||
ack.getLastMessageId.setEntryLocator(EntryLocator(qid, seq))
|
||||
|
||||
func(XaAckRecord(collectionKey, seq, ack, sub))
|
||||
} else {
|
||||
var locator = (value.getValueLocation, value.getValueLength)
|
||||
var locator = DataLocator(value.getValueLocation, value.getValueLength)
|
||||
val msg = getMessage(locator)
|
||||
msg.getMessageId().setEntryLocator((collectionKey, seq))
|
||||
msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
|
||||
msg.getMessageId().setDataLocator(locator)
|
||||
func(msg)
|
||||
}
|
||||
|
@ -1143,7 +1143,7 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
case x:MessageRecord =>
|
||||
// Encoded form is still in memory..
|
||||
Some(x.data)
|
||||
case (pos:Long, len:Int) =>
|
||||
case DataLocator(pos, len) =>
|
||||
// Load the encoded form from disk.
|
||||
log.read(pos, len).map(new Buffer(_))
|
||||
}
|
||||
|
@ -1214,26 +1214,26 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
val messageRecord = action.messageRecord
|
||||
var log_info:LogInfo = null
|
||||
var pos = -1L
|
||||
var dataLocator:(Long, Int) = null
|
||||
var dataLocator:DataLocator = null
|
||||
|
||||
if (messageRecord != null && messageRecord.locator==null) {
|
||||
val start = System.nanoTime()
|
||||
val p = appender.append(LOG_DATA, messageRecord.data)
|
||||
pos = p._1
|
||||
log_info = p._2
|
||||
dataLocator = (pos, messageRecord.data.length)
|
||||
dataLocator = DataLocator(pos, messageRecord.data.length)
|
||||
messageRecord.locator = dataLocator
|
||||
write_message_total += System.nanoTime() - start
|
||||
}
|
||||
|
||||
|
||||
action.dequeues.foreach { entry =>
|
||||
val keyLocation = entry.id.getEntryLocator.asInstanceOf[(Long, Long)]
|
||||
val key = encodeEntryKey(ENTRY_PREFIX, keyLocation._1, keyLocation._2)
|
||||
val keyLocation = entry.id.getEntryLocator.asInstanceOf[EntryLocator]
|
||||
val key = encodeEntryKey(ENTRY_PREFIX, keyLocation.qid, keyLocation.seq)
|
||||
|
||||
if( dataLocator==null ) {
|
||||
dataLocator = entry.id.getDataLocator match {
|
||||
case x:(Long, Int) => x
|
||||
case x:DataLocator => x
|
||||
case x:MessageRecord => x.locator
|
||||
case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator)
|
||||
}
|
||||
|
@ -1242,11 +1242,11 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
val log_record = new EntryRecord.Bean()
|
||||
log_record.setCollectionKey(entry.queueKey)
|
||||
log_record.setEntryKey(new Buffer(key, 9, 8))
|
||||
log_record.setValueLocation(dataLocator._1)
|
||||
log_record.setValueLocation(dataLocator.pos)
|
||||
appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
|
||||
|
||||
batch.delete(key)
|
||||
logRefDecrement(dataLocator._1)
|
||||
logRefDecrement(dataLocator.pos)
|
||||
collectionDecrementSize(entry.queueKey)
|
||||
}
|
||||
|
||||
|
@ -1254,7 +1254,7 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
|
||||
if(dataLocator ==null ) {
|
||||
dataLocator = entry.id.getDataLocator match {
|
||||
case x:(Long, Int) => x
|
||||
case x:DataLocator => x
|
||||
case x:MessageRecord => x.locator
|
||||
case _ =>
|
||||
throw new RuntimeException("Unexpected locator type")
|
||||
|
@ -1270,13 +1270,13 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
val log_record = new EntryRecord.Bean()
|
||||
log_record.setCollectionKey(entry.queueKey)
|
||||
log_record.setEntryKey(new Buffer(key, 9, 8))
|
||||
log_record.setValueLocation(dataLocator._1)
|
||||
log_record.setValueLength(dataLocator._2)
|
||||
log_record.setValueLocation(dataLocator.pos)
|
||||
log_record.setValueLength(dataLocator.len)
|
||||
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
|
||||
|
||||
val index_record = new EntryRecord.Bean()
|
||||
index_record.setValueLocation(dataLocator._1)
|
||||
index_record.setValueLength(dataLocator._2)
|
||||
index_record.setValueLocation(dataLocator.pos)
|
||||
index_record.setValueLength(dataLocator.len)
|
||||
batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
|
||||
|
||||
val log_data = encodeEntryRecord(log_record.freeze())
|
||||
|
@ -1297,7 +1297,7 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
val ack = entry.ack
|
||||
if( dataLocator==null ) {
|
||||
dataLocator = ack.getLastMessageId.getDataLocator match {
|
||||
case x:(Long, Int) => x
|
||||
case x:DataLocator => x
|
||||
case x:MessageRecord => x.locator
|
||||
case _ =>
|
||||
throw new RuntimeException("Unexpected locator type")
|
||||
|
@ -1305,12 +1305,12 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
}
|
||||
println(dataLocator)
|
||||
|
||||
val (qid, seq) = ack.getLastMessageId.getEntryLocator.asInstanceOf[(Long, Long)];
|
||||
val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
|
||||
val os = new DataByteArrayOutputStream()
|
||||
os.writeLong(dataLocator._1)
|
||||
os.writeInt(dataLocator._2)
|
||||
os.writeLong(qid)
|
||||
os.writeLong(seq)
|
||||
os.writeLong(dataLocator.pos)
|
||||
os.writeInt(dataLocator.len)
|
||||
os.writeLong(el.qid)
|
||||
os.writeLong(el.seq)
|
||||
os.writeLong(entry.sub)
|
||||
store.wireFormat.marshal(ack, os)
|
||||
var ack_encoded = os.toBuffer
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.activemq.leveldb.test
|
|||
import org.apache.activemq.ActiveMQConnectionFactory
|
||||
import javax.jms.{Destination, ConnectionFactory}
|
||||
import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue}
|
||||
import org.apache.activemq.leveldb.test.JMSClientScenario
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.activemq.leveldb.test
|
|||
|
||||
import java.lang.Thread
|
||||
import javax.jms._
|
||||
import org.apache.activemq.leveldb.test.Scenario
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
|
Loading…
Reference in New Issue