Fixes bug in replicated leveldb where log files on slaves were not getting GCed.

This commit is contained in:
Hiram Chirino 2013-09-16 11:58:03 -04:00
parent a69379d5fb
commit d771ebb97e
6 changed files with 74 additions and 2 deletions

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.leveldb.replicated.dto;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@XmlRootElement(name="remove_request")
@XmlAccessorType(XmlAccessType.FIELD)
@JsonIgnoreProperties(ignoreUnknown = true)
public class LogDelete {
@XmlAttribute(name="log")
public long log;
}

View File

@ -88,12 +88,16 @@ case class RecordLog(directory: File, logSuffix:String) {
if( current_appender.position != id ) {
Option(log_infos.get(id)).foreach { info =>
onDelete(info.file)
onDelete(id)
log_infos.remove(id)
}
}
}
}
protected def onDelete(file:Long) = {
}
protected def onDelete(file:File) = {
file.delete()
}

View File

@ -150,5 +150,10 @@ class MasterLevelDBClient(val store:MasterLevelDBStore) extends LevelDBClient(st
}
}
override protected def onDelete(file: Long) = {
super.onDelete(file)
store.replicate_log_delete(file)
}
}
}

View File

@ -309,17 +309,19 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
}
def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame ) = {
def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame=null ) = {
val h = this.synchronized {
session
}
if( h !=null ) {
h.queue {
h.send(frame1)
if( frame2!=null ) {
h.send(frame2)
}
}
}
}
def position_update(position:Long) = {
this.position.getAndSet(position)
@ -400,6 +402,15 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
}
def replicate_log_delete(log:Long):Unit = {
val value = new LogDelete
value.log = log
val frame = ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value))
for( slave <- slaves.values() ) {
slave.replicate_wal(frame)
}
}
def wal_append_position = client.wal_append_position
}

View File

@ -38,6 +38,7 @@ object ReplicationSupport {
val OK_ACTION = ascii("ok")
val DISCONNECT_ACTION = ascii("disconnect")
val ERROR_ACTION = ascii("error")
val LOG_DELETE_ACTION = ascii("rm")
def unmap(buffer:MappedByteBuffer ) {
try {

View File

@ -152,6 +152,8 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
}
val pending_log_removes = new util.ArrayList[Long]()
def wal_handler(session:Session): (AnyRef)=>Unit = (command)=>{
command match {
case command:ReplicationFrame =>
@ -178,6 +180,15 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
send_wal_ack
}
})
case LOG_DELETE_ACTION =>
val value = JsonCodec.decode(command.body, classOf[LogDelete])
if( !caughtUp ) {
pending_log_removes.add(value.log)
} else {
client.log.delete(value.log)
}
case OK_ACTION =>
// This comes in as response to a disconnect we send.
case _ => session.fail("Unexpected command action: "+command.action)
@ -394,6 +405,10 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
caughtUp = true
client.log.open(wal_append_offset)
send_wal_ack
for( i <- pending_log_removes ) {
client.log.delete(i);
}
pending_log_removes.clear()
}
})
state.snapshot_position