The replicated leveldb store will now stash the last known good replica before starting to replicate with a new master. If the replication does not fully synchronize before a slave failure occurs, the store will revert back to the the stashed state.

If a slave connection encounters an error, try to reconnect again after 1 second.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1478548 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-02 21:07:04 +00:00
parent b9cc24bdb6
commit ede1705e34
6 changed files with 165 additions and 52 deletions

View File

@ -366,7 +366,7 @@ object LevelDBClient extends Log {
def create_sequence_file(directory:File, id:Long, suffix:String) = directory / ("%016x%s".format(id, suffix)) def create_sequence_file(directory:File, id:Long, suffix:String) = directory / ("%016x%s".format(id, suffix))
def find_sequence_files(directory:File, suffix:String):TreeMap[Long, File] = { def find_sequence_files(directory:File, suffix:String):TreeMap[Long, File] = {
TreeMap((directory.listFiles.flatMap { f=> TreeMap((directory.list_files.flatMap { f=>
if( f.getName.endsWith(suffix) ) { if( f.getName.endsWith(suffix) ) {
try { try {
val base = f.getName.stripSuffix(suffix) val base = f.getName.stripSuffix(suffix)
@ -385,6 +385,19 @@ object LevelDBClient extends Log {
var size = 0L var size = 0L
var last_key:Array[Byte] = _ var last_key:Array[Byte] = _
} }
def copy_index(from:File, to:File) = {
for( file <- from.list_files ) {
val name = file.getName
if( name == "CURRENT" || name.startsWith("MANIFEST-") ) {
/// These might not be append only files, so avoid hard linking just to be safe.
file.copyTo(to / file.getName)
} else {
// These are append only files, so they are safe to hard link.
file.linkTo(to / file.getName)
}
}
}
} }
@ -564,7 +577,7 @@ class LevelDBClient(store: LevelDBStore) {
lastSnapshotIndex.foreach { case (id, file) => lastSnapshotIndex.foreach { case (id, file) =>
// Resume log replay from a snapshot of the index.. // Resume log replay from a snapshot of the index..
try { try {
file.listFiles.foreach { file => for( file <- file.list_files) {
file.linkTo(dirtyIndexFile / file.getName) file.linkTo(dirtyIndexFile / file.getName)
} }
} catch { } catch {
@ -834,7 +847,7 @@ class LevelDBClient(store: LevelDBStore) {
try { try {
// Hard link all the index files. // Hard link all the index files.
dirtyIndexFile.listFiles.foreach { file => for( file <- dirtyIndexFile.list_files) {
file.linkTo(tmpDir / file.getName) file.linkTo(tmpDir / file.getName)
} }
@ -882,12 +895,12 @@ class LevelDBClient(store: LevelDBStore) {
} }
def locked_purge { def locked_purge {
logDirectory.listFiles.foreach {x => for( x <- logDirectory.list_files) {
if (x.getName.endsWith(".log")) { if (x.getName.endsWith(".log")) {
x.delete() x.delete()
} }
} }
directory.listFiles.foreach {x => for( x <- directory.list_files) {
if (x.getName.endsWith(".index")) { if (x.getName.endsWith(".index")) {
x.recursiveDelete x.recursiveDelete
} }

View File

@ -124,7 +124,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
final val wireFormat = new OpenWireFormat final val wireFormat = new OpenWireFormat
final val db = new DBManager(this) final val db = new DBManager(this)
final val client = createClient final var client = createClient
@BeanProperty @BeanProperty
var directory = DEFAULT_DIRECTORY var directory = DEFAULT_DIRECTORY

View File

@ -26,7 +26,7 @@ import org.fusesource.hawtdispatch.transport._
import java.util.concurrent._ import java.util.concurrent._
import java.io.{IOException, File} import java.io.{IOException, File}
import java.net.{InetSocketAddress, URI} import java.net.{InetSocketAddress, URI}
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
class PositionSync(val position:Long, count:Int) extends CountDownLatch(count) class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
@ -51,6 +51,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
val slaves = new ConcurrentHashMap[String,SlaveState]() val slaves = new ConcurrentHashMap[String,SlaveState]()
override def doStart = { override def doStart = {
unstash(directory)
super.doStart super.doStart
start_protocol_server start_protocol_server
} }
@ -214,6 +215,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
var held_snapshot:Option[Long] = None var held_snapshot:Option[Long] = None
var session:Session = _ var session:Session = _
var position = new AtomicLong(0) var position = new AtomicLong(0)
var caughtUp = new AtomicBoolean(false)
def start(session:Session) = { def start(session:Session) = {
debug("SlaveState:start") debug("SlaveState:start")
@ -261,7 +263,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def position_update(position:Long) = { def position_update(position:Long) = {
val was = this.position.getAndSet(position) val was = this.position.getAndSet(position)
if( was == 0 ) { if( was == 0 ) {
info("Slave has finished synchronizing: "+slave_id) info("Slave has finished state transfer: "+slave_id)
this.synchronized { this.synchronized {
this.held_snapshot = None this.held_snapshot = None
} }
@ -275,6 +277,9 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
val p = position_sync val p = position_sync
if( last_position_sync!=p ) { if( last_position_sync!=p ) {
if( position.get >= p.position ) { if( position.get >= p.position ) {
if( caughtUp.compareAndSet(false, true) ) {
info("Slave has now caught up: "+slave_id)
}
p.countDown p.countDown
last_position_sync = p last_position_sync = p
} }

View File

@ -24,6 +24,9 @@ import java.io.{RandomAccessFile, File}
import java.nio.channels.FileChannel import java.nio.channels.FileChannel
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import org.fusesource.hawtdispatch._ import org.fusesource.hawtdispatch._
import org.apache.activemq.leveldb.util.FileSupport._
import org.apache.activemq.leveldb.LevelDBClient
import scala.collection.immutable.TreeMap
object ReplicationSupport { object ReplicationSupport {
@ -54,24 +57,69 @@ object ReplicationSupport {
} }
} }
case class RetainedLatch() { def stash(directory:File) {
directory.mkdirs()
private val latch = new CountDownLatch(1) val tmp_stash = directory / "stash.tmp"
private val remaining = new AtomicInteger(1) val stash = directory / "stash"
private val release_task = ^{ release } stash.recursiveDelete
tmp_stash.recursiveDelete
def retain = { tmp_stash.mkdirs()
remaining.incrementAndGet() copy_store_dir(directory, tmp_stash)
release_task tmp_stash.renameTo(stash)
}
def release {
if (remaining.decrementAndGet() == 0) {
latch.countDown()
}
}
def await() = latch.await()
} }
def copy_store_dir(from:File, to:File) = {
val log_files = LevelDBClient.find_sequence_files(from, LevelDBClient.LOG_SUFFIX)
if( !log_files.isEmpty ) {
val append_file = log_files.last._2
for( file <- log_files.values ; if file != append_file) {
file.linkTo(to / file.getName)
val crc_file = file.getParentFile / (file.getName+".crc32" )
if( crc_file.exists() ) {
crc_file.linkTo(to / crc_file.getName)
}
}
append_file.copyTo(to / append_file.getName)
}
val index_dirs = LevelDBClient.find_sequence_files(from, LevelDBClient.INDEX_SUFFIX)
if( !index_dirs.isEmpty ) {
val index_file = index_dirs.last._2
var target = to / index_file.getName
target.mkdirs()
LevelDBClient.copy_index(index_file, target)
}
}
def stash_clear(directory:File) {
val stash = directory / "stash"
stash.recursiveDelete
}
def unstash(directory:File) {
val tmp_stash = directory / "stash.tmp"
tmp_stash.recursiveDelete
val stash = directory / "stash"
if( stash.exists() ) {
delete_store(directory)
copy_store_dir(stash, directory)
stash.recursiveDelete
}
}
def delete_store(directory: File) {
// Delete any existing files to make space for the stash we will be restoring..
var t: TreeMap[Long, File] = LevelDBClient.find_sequence_files(directory, LevelDBClient.LOG_SUFFIX)
for (entry <- t) {
val file = entry._2
file.delete()
val crc_file = directory / (file.getName+".crc32" )
if( crc_file.exists() ) {
crc_file.delete()
}
}
for (file <- LevelDBClient.find_sequence_files(directory, LevelDBClient.INDEX_SUFFIX)) {
file._2.recursiveDelete
}
}
} }

View File

@ -29,6 +29,7 @@ import org.apache.activemq.leveldb.util._
import FileSupport._ import FileSupport._
import java.io.{IOException, RandomAccessFile, File} import java.io.{IOException, RandomAccessFile, File}
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import java.util.concurrent.{CountDownLatch, TimeUnit}
object SlaveLevelDBStore extends Log object SlaveLevelDBStore extends Log
@ -52,13 +53,38 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
override def doStart() = { override def doStart() = {
client.init() client.init()
if (purgeOnStatup) { if (purgeOnStatup) {
purgeOnStatup = false purgeOnStatup = false
db.client.locked_purge db.client.locked_purge
info("Purged: "+this) info("Purged: "+this)
} }
db.client.dirtyIndexFile.recursiveDelete
db.client.plistIndexFile.recursiveDelete
start_slave_connections
}
var stopped = false
override def doStop(stopper: ServiceStopper) = {
val latch = new CountDownLatch(1)
stop_connections(^{
latch.countDown
})
// Make sure the sessions are stopped before we close the client.
latch.await()
client.stop()
}
def restart_slave_connections = {
stop_connections(^{
client.stop()
client = createClient
client.init()
start_slave_connections
})
}
def start_slave_connections = {
val transport = new TcpTransport() val transport = new TcpTransport()
transport.setBlockingExecutor(blocking_executor) transport.setBlockingExecutor(blocking_executor)
transport.setDispatchQueue(queue) transport.setDispatchQueue(queue)
@ -66,6 +92,11 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
info("Connecting to master...") info("Connecting to master...")
wal_session = new Session(transport, (session)=>{ wal_session = new Session(transport, (session)=>{
// lets stash away our current state so that we can unstash it
// in case we don't get caught up.. If the master dies,
// the stashed data might be the best option to become the master.
stash(directory)
delete_store(directory)
debug("Connected to master. Syncing") debug("Connected to master. Syncing")
session.request_then(SYNC_ACTION, null) { body => session.request_then(SYNC_ACTION, null) { body =>
val response = JsonCodec.decode(body, classOf[SyncResponse]) val response = JsonCodec.decode(body, classOf[SyncResponse])
@ -74,26 +105,30 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
} }
}) })
} }
var stopped = false
override def doStop(stopper: ServiceStopper) = { def stop_connections(cb:Task) = {
val latch = RetainedLatch() var then = ^{
unstash(directory)
cb.run()
}
if( wal_session !=null ) { if( wal_session !=null ) {
wal_session.disconnect(latch.retain) val next = then
wal_session = null then = ^{
wal_session.transport.stop(next)
wal_session = null
}
} }
if( transfer_session !=null ) { if( transfer_session !=null ) {
transfer_session.disconnect(latch.retain) val next = then
transfer_session = null then = ^{
transfer_session.transport.stop(next)
transfer_session = null
}
} }
queue { then.run();
stopped = true
latch.release
}
// Make sure the sessions are stopped before we close the client.
latch.await()
db.client.stop()
} }
var wal_append_position = 0L var wal_append_position = 0L
var wal_append_offset = 0L var wal_append_offset = 0L
@ -149,6 +184,11 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
override def onTransportFailure(error: IOException) { override def onTransportFailure(error: IOException) {
if( isStarted ) { if( isStarted ) {
warn("Unexpected session error: "+error) warn("Unexpected session error: "+error)
queue.after(1, TimeUnit.SECONDS) {
if( isStarted ) {
restart_slave_connections
}
}
} }
super.onTransportFailure(error) super.onTransportFailure(error)
} }
@ -210,9 +250,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
} }
def transfer_missing(state:SyncResponse) = { def transfer_missing(state:SyncResponse) = {
// Start up another connection to catch sync
// up the missing data
val log_dir = client.logDirectory
val dirty_index = client.dirtyIndexFile val dirty_index = client.dirtyIndexFile
dirty_index.recursiveDelete dirty_index.recursiveDelete
@ -230,27 +268,29 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
// Transfer the log files.. // Transfer the log files..
var append_offset = 0L var append_offset = 0L
for( x <- state.log_files ) { for( x <- state.log_files ) {
if( x.file == state.append_log ) { if( x.file == state.append_log ) {
append_offset = x.length append_offset = x.length
} }
val target_file: File = log_dir / x.file val stashed_file: File = directory / "stash" / x.file
val target_file: File = directory / x.file
def previously_downloaded:Boolean = { def previously_downloaded:Boolean = {
if( !target_file.exists() ) if( !stashed_file.exists() )
return false return false
if (target_file.length() < x.length ) if (stashed_file.length() < x.length )
return false return false
if (target_file.length() == x.length ) if (stashed_file.length() == x.length )
return target_file.cached_crc32 == x.crc32 return stashed_file.cached_crc32 == x.crc32
if ( target_file.crc32(x.length) == x.crc32 ) { if ( stashed_file.crc32(x.length) == x.crc32 ) {
// we don't want to truncate the log file currently being appended to. // we don't want to truncate the log file currently being appended to.
if( x.file != state.append_log ) { if( x.file != state.append_log ) {
// Our log file might be longer. lets truncate to match. // Our log file might be longer. lets truncate to match.
val raf = new RandomAccessFile(target_file, "rw") val raf = new RandomAccessFile(stashed_file, "rw")
try { try {
raf.setLength(x.length) raf.setLength(x.length)
} finally { } finally {
@ -264,7 +304,13 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
// We don't have to transfer log files that have been previously transferred. // We don't have to transfer log files that have been previously transferred.
if( previously_downloaded ) { if( previously_downloaded ) {
// lets link it from the stash directory..
info("Slave skipping download of: log/"+x.file) info("Slave skipping download of: log/"+x.file)
if( x.file == state.append_log ) {
stashed_file.copyTo(target_file) // let not link a file that's going to be modified..
} else {
stashed_file.linkTo(target_file)
}
} else { } else {
val transfer = new Transfer() val transfer = new Transfer()
transfer.file = "log/"+x.file transfer.file = "log/"+x.file
@ -303,6 +349,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
session.request_then(DISCONNECT_ACTION, null) { body => session.request_then(DISCONNECT_ACTION, null) { body =>
// Ok we are now caught up. // Ok we are now caught up.
info("Slave has now caught up") info("Slave has now caught up")
stash_clear(directory) // we don't need the stash anymore.
transport.stop(NOOP) transport.stop(NOOP)
transfer_session = null transfer_session = null
replay_from = state.snapshot_position replay_from = state.snapshot_position

View File

@ -136,7 +136,7 @@ object FileSupport {
} }
} }
def listFiles:Array[File] = { def list_files:Array[File] = {
Option(self.listFiles()).getOrElse(Array()) Option(self.listFiles()).getOrElse(Array())
} }