diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java index 901524262c..19b2192925 100644 --- a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java +++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java @@ -52,6 +52,9 @@ public interface ReplicatedLevelDBStoreViewMBean { @MBeanInfo("The current position of the replication log.") Long getPosition(); + @MBeanInfo("When the last entry was added to the replication log.") + Long getPositionDate(); + @MBeanInfo("The directory holding the data.") String getDirectory(); diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java index 52e126f95d..b3749bbd49 100644 --- a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java +++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java @@ -43,4 +43,6 @@ public class LogWrite { @XmlAttribute(name="sync") public boolean sync=false; + @XmlAttribute(name="date") + public long date; } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala index 53e9971165..0399bd98ca 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala @@ -463,6 +463,21 @@ class ReplicatedLevelDBStoreView(val store:ElectingLevelDBStore) extends Replica null } + def getPositionDate:java.lang.Long = { + val rc = if( slave!=null ) { + slave.wal_date + } else if( master!=null ) { + master.wal_date + } else { + 0 + } + if( rc != 0 ) { + return new java.lang.Long(rc) + } else { + return null + } + } + def getDirectory = directory.getCanonicalPath def getSync = sync diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala index 6572aa9ab4..b8289fc721 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala @@ -386,6 +386,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { false } + def date = System.currentTimeMillis() def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = { if( length > 0 ) { @@ -393,6 +394,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { value.file = position; value.offset = offset; value.length = length + value.date = date + wal_date = value.date; value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0 val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value)) val frame2 = FileTransferFrame(file, offset, length) @@ -412,5 +415,6 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } def wal_append_position = client.wal_append_position - + @volatile + var wal_date = 0L } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala index 8c832a7327..ce76230e6d 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala @@ -134,6 +134,8 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { var wal_append_position = 0L var wal_append_offset = 0L + @volatile + var wal_date = 0L def send_wal_ack = { queue.assertExecuting() @@ -173,6 +175,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { // info("Slave WAL update: %s, (offset: %d, length: %d), sending ack:%s", file, value.offset, value.length, caughtUp) wal_append_offset = value.offset+value.length wal_append_position = value.file + wal_append_offset + wal_date = value.date if( !stopped ) { if( caughtUp ) { client.log.current_appender.skip(value.length)