diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogDelete.java b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogDelete.java new file mode 100644 index 0000000000..6c44e691d3 --- /dev/null +++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogDelete.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.leveldb.replicated.dto; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * @author Hiram Chirino + */ +@XmlRootElement(name="remove_request") +@XmlAccessorType(XmlAccessType.FIELD) +@JsonIgnoreProperties(ignoreUnknown = true) +public class LogDelete { + @XmlAttribute(name="log") + public long log; +} diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala index 1cbf95b34e..5a4629e422 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala @@ -88,12 +88,16 @@ case class RecordLog(directory: File, logSuffix:String) { if( current_appender.position != id ) { Option(log_infos.get(id)).foreach { info => onDelete(info.file) + onDelete(id) log_infos.remove(id) } } } } + protected def onDelete(file:Long) = { + } + protected def onDelete(file:File) = { file.delete() } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala index 1d347f4a36..95a7e73353 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala @@ -150,5 +150,10 @@ class MasterLevelDBClient(val store:MasterLevelDBStore) extends LevelDBClient(st } } + + override protected def onDelete(file: Long) = { + super.onDelete(file) + store.replicate_log_delete(file) + } } } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala index 0318eff361..6572aa9ab4 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala @@ -309,14 +309,16 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } } - def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame ) = { + def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame=null ) = { val h = this.synchronized { session } if( h !=null ) { h.queue { h.send(frame1) - h.send(frame2) + if( frame2!=null ) { + h.send(frame2) + } } } } @@ -400,6 +402,15 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } } + def replicate_log_delete(log:Long):Unit = { + val value = new LogDelete + value.log = log + val frame = ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value)) + for( slave <- slaves.values() ) { + slave.replicate_wal(frame) + } + } + def wal_append_position = client.wal_append_position } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala index 0b45bf67f2..4e41a2e722 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala @@ -38,6 +38,7 @@ object ReplicationSupport { val OK_ACTION = ascii("ok") val DISCONNECT_ACTION = ascii("disconnect") val ERROR_ACTION = ascii("error") + val LOG_DELETE_ACTION = ascii("rm") def unmap(buffer:MappedByteBuffer ) { try { diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala index 0d14e6da3b..cf22cf9cda 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala @@ -152,6 +152,8 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } } + val pending_log_removes = new util.ArrayList[Long]() + def wal_handler(session:Session): (AnyRef)=>Unit = (command)=>{ command match { case command:ReplicationFrame => @@ -178,6 +180,15 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { send_wal_ack } }) + case LOG_DELETE_ACTION => + + val value = JsonCodec.decode(command.body, classOf[LogDelete]) + if( !caughtUp ) { + pending_log_removes.add(value.log) + } else { + client.log.delete(value.log) + } + case OK_ACTION => // This comes in as response to a disconnect we send. case _ => session.fail("Unexpected command action: "+command.action) @@ -394,6 +405,10 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { caughtUp = true client.log.open(wal_append_offset) send_wal_ack + for( i <- pending_log_removes ) { + client.log.delete(i); + } + pending_log_removes.clear() } }) state.snapshot_position