Implement some tracking of producer positions in the LevelDB store to help the broker to filter out dups.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1482788 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-15 12:30:34 +00:00
parent 1399cf9522
commit 842630c8c7
3 changed files with 42 additions and 3 deletions

View File

@ -33,6 +33,8 @@ import collection.mutable.{HashSet, ListBuffer}
import org.apache.activemq.util.ByteSequence
import util.TimeMetric
import scala.Some
import org.apache.activemq.ActiveMQMessageAuditNoSync
import org.fusesource.hawtdispatch
case class EntryLocator(qid:Long, seq:Long)
case class DataLocator(pos:Long, len:Int)
@ -254,7 +256,6 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
val id = message.getMessageId
val messageRecord = id.getDataLocator match {
case null =>
// encodes body and release object bodies, in case message was sent from
@ -414,10 +415,23 @@ class DBManager(val parent:LevelDBStore) {
val lastUowId = new AtomicInteger(1)
val producerSequenceIdTracker = new ActiveMQMessageAuditNoSync
def getLastProducerSequenceId(id: ProducerId): Long = dispatchQueue.sync {
producerSequenceIdTracker.getLastSeqId(id)
}
def processClosed(uow:DelayableUOW) = {
dispatchQueue.assertExecuting()
uowClosedCounter += 1
// track the producer seq positions.
for( (_, action) <- uow.actions ) {
if( action.messageRecord!=null ) {
producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
}
}
// Broker could issue a flush_message call before
// this stage runs.. which make the stage jump over UowDelayed
if( uow.state.stage < UowDelayed.stage ) {
@ -426,6 +440,10 @@ class DBManager(val parent:LevelDBStore) {
if( uow.state.stage < UowFlushing.stage ) {
uow.actions.foreach { case (id, action) =>
if( action.messageRecord!=null ) {
producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
}
// The UoW may have been canceled.
if( action.messageRecord!=null && action.enqueues.isEmpty ) {
action.removeFromPendingStore()

View File

@ -548,8 +548,22 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
return rc
}
def getLastProducerSequenceId(id: ProducerId): Long = {
return -1
def getLastProducerSequenceId(id: ProducerId) = db.getLastProducerSequenceId(id)
def setMaxFailoverProducersToTrack(maxFailoverProducersToTrack:Int ) = {
db.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
}
def getMaxFailoverProducersToTrack() = {
db.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack()
}
def setFailoverProducersAuditDepth(failoverProducersAuditDepth:Int) = {
db.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
}
def getFailoverProducersAuditDepth() = {
db.producerSequenceIdTracker.getAuditDepth();
}
def size: Long = {

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.activemq.leveldb.util.Log
import java.io.File
import org.apache.activemq.usage.SystemUsage
import org.apache.activemq.ActiveMQMessageAuditNoSync
object ElectingLevelDBStore extends Log {
@ -120,6 +121,10 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
var asyncBufferSize = 1024 * 1024 * 4
@BeanProperty
var monitorStats = false
@BeanProperty
var failoverProducersAuditDepth = ActiveMQMessageAuditNoSync.DEFAULT_WINDOW_SIZE;
@BeanProperty
var maxFailoverProducersToTrack = ActiveMQMessageAuditNoSync.MAXIMUM_PRODUCER_COUNT;
var master: MasterLevelDBStore = _
var slave: SlaveLevelDBStore = _
@ -290,6 +295,8 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
store.asyncBufferSize = asyncBufferSize
store.monitorStats = monitorStats
store.securityToken = securityToken
store.setFailoverProducersAuditDepth(failoverProducersAuditDepth)
store.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack)
store.setBrokerName(brokerName)
store.setBrokerService(brokerService)
store.setUsageManager(usageManager)