Support multiple local/remote syncing styles.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1483810 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-17 14:04:12 +00:00
parent d43d08aa1c
commit 1558cbbb55
5 changed files with 49 additions and 6 deletions

View File

@ -40,4 +40,7 @@ public class LogWrite {
@XmlAttribute(name="length")
public long length;
@XmlAttribute(name="sync")
public boolean sync=false;
}

View File

@ -77,6 +77,8 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
@BeanProperty
var replicas = 2
@BeanProperty
var sync="quorum_mem"
def clusterSizeQuorum = (replicas/2) + 1
@ -96,8 +98,6 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
@BeanProperty
var indexFactory: String = "org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory"
@BeanProperty
var sync: Boolean = true
@BeanProperty
var verifyChecksums: Boolean = false
@BeanProperty
var indexMaxOpenFiles: Int = 1000
@ -262,6 +262,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
configure(master)
master.replicas = replicas
master.bind = bind
master.syncTo = sync
master
}
@ -281,7 +282,6 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
def configure(store: ReplicatedLevelDBStoreTrait) {
store.directory = directory
store.indexFactory = indexFactory
store.sync = sync
store.verifyChecksums = verifyChecksums
store.indexMaxOpenFiles = indexMaxOpenFiles
store.indexBlockRestartInterval = indexBlockRestartInterval

View File

@ -138,8 +138,14 @@ class MasterLevelDBClient(val store:MasterLevelDBStore) extends LevelDBClient(st
}
override def force = {
flush
store.wal_sync_to(position+flushed_offset.get())
import MasterLevelDBStore._
if( (store.syncToMask & SYNC_TO_DISK) != 0) {
super.force
}
if( (store.syncToMask & SYNC_TO_REMOTE) != 0) {
flush
store.wal_sync_to(position+flushed_offset.get())
}
}
}

View File

@ -31,7 +31,14 @@ import scala.reflect.BeanProperty
class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
object MasterLevelDBStore extends Log
object MasterLevelDBStore extends Log {
val SYNC_TO_DISK = 0x01
val SYNC_TO_REMOTE = 0x02
val SYNC_TO_REMOTE_MEMORY = 0x04 | SYNC_TO_REMOTE
val SYNC_TO_REMOTE_DISK = 0x08 | SYNC_TO_REMOTE
}
/**
*/
@ -48,6 +55,29 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
var replicas = 2
def minSlaveAcks = replicas/2
var _syncTo="quorum_mem"
var syncToMask=SYNC_TO_REMOTE_MEMORY
@BeanProperty
def syncTo = _syncTo
@BeanProperty
def syncTo_=(value:String) {
_syncTo = value
syncToMask = 0
for( v <- value.split(",").map(_.trim.toLowerCase) ) {
v match {
case "" =>
case "local_mem" =>
case "local_disk" => syncToMask |= SYNC_TO_DISK
case "remote_mem" => syncToMask |= SYNC_TO_REMOTE_MEMORY
case "remote_disk" => syncToMask |= SYNC_TO_REMOTE_DISK
case "quorum_mem" => syncToMask |= SYNC_TO_REMOTE_MEMORY
case "quorum_disk" => syncToMask |= SYNC_TO_REMOTE_DISK | SYNC_TO_DISK
case x => warn("Unknown syncTo value: [%s]", x)
}
}
}
val slaves = new ConcurrentHashMap[String,SlaveState]()
override def doStart = {
@ -316,6 +346,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
value.file = position;
value.offset = offset;
value.length = length
value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
val frame2 = FileTransferFrame(file, offset, length)
for( slave <- slaves.values() ) {

View File

@ -161,6 +161,9 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
val file = client.log.next_log(value.file)
val buffer = map(file, value.offset, value.length, false)
session.codec.readData(buffer, ^{
if( value.sync ) {
buffer.force()
}
unmap(buffer)
// info("Slave WAL update: %s, (offset: %d, length: %d), sending ack:%s", file, value.offset, value.length, caughtUp)
wal_append_offset = value.offset+value.length