From ebafd5c19388ce183299c2c41c966c5580e87821 Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Tue, 25 Nov 2014 11:40:48 -0500 Subject: [PATCH] Implements https://issues.apache.org/jira/browse/AMQ-5458 --- .../leveldb/LevelDBStoreTestMBean.java | 56 +++++++++ .../activemq/leveldb/LevelDBStore.scala | 75 +++++++++++++ .../apache/activemq/leveldb/RecordLog.scala | 106 +++++++++++++++++- 3 files changed, 232 insertions(+), 5 deletions(-) create mode 100644 activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreTestMBean.java diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreTestMBean.java b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreTestMBean.java new file mode 100644 index 0000000000..63338a8f2e --- /dev/null +++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreTestMBean.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.leveldb; + +import org.apache.activemq.broker.jmx.MBeanInfo; + +/** + *

+ *

+ * + * @author Hiram Chirino + */ +public interface LevelDBStoreTestMBean { + + @MBeanInfo("Used to set if the log force calls should be suspended") + void setSuspendForce(boolean value); + + @MBeanInfo("Gets if the log force calls should be suspended") + boolean getSuspendForce(); + + @MBeanInfo("Gets the number of threads waiting to do a log force call.") + long getForceCalls(); + + @MBeanInfo("Used to set if the log write calls should be suspended") + void setSuspendWrite(boolean value); + + @MBeanInfo("Gets if the log write calls should be suspended") + boolean getSuspendWrite(); + + @MBeanInfo("Gets the number of threads waiting to do a log write call.") + long getWriteCalls(); + + @MBeanInfo("Used to set if the log delete calls should be suspended") + void setSuspendDelete(boolean value); + + @MBeanInfo("Gets if the log delete calls should be suspended") + boolean getSuspendDelete(); + + @MBeanInfo("Gets the number of threads waiting to do a log delete call.") + long getDeleteCalls(); +} diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index f86e05be01..b10cd3e533 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -37,6 +37,7 @@ import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream} import org.fusesource.hawtdispatch; import org.apache.activemq.broker.scheduler.JobSchedulerStore import org.apache.activemq.store.IndexListener.MessageContext +import javax.management.ObjectName object LevelDBStore extends Log { val DEFAULT_DIRECTORY = new File("LevelDB"); @@ -82,6 +83,74 @@ case class DurableSubscription(subKey:Long, topicKey:Long, info: SubscriptionInf var cursorPosition = 0L } +class LevelDBStoreTest(val store:LevelDBStore) extends LevelDBStoreTestMBean { + + import store._ + var suspendForce = false; + + override def setSuspendForce(value: Boolean): Unit = this.synchronized { + if( suspendForce!=value ) { + suspendForce = value; + if( suspendForce ) { + db.client.log.recordLogTestSupport.forceCall.suspend + } else { + db.client.log.recordLogTestSupport.forceCall.resume + } + } + } + + override def getSuspendForce: Boolean = this.synchronized { + suspendForce + } + + override def getForceCalls = this.synchronized { + db.client.log.recordLogTestSupport.forceCall.threads.get() + } + + var suspendWrite = false; + + override def setSuspendWrite(value: Boolean): Unit = this.synchronized { + if( suspendWrite!=value ) { + suspendWrite = value; + if( suspendWrite ) { + db.client.log.recordLogTestSupport.writeCall.suspend + } else { + db.client.log.recordLogTestSupport.writeCall.resume + } + } + } + + override def getSuspendWrite: Boolean = this.synchronized { + suspendWrite + } + + override def getWriteCalls = this.synchronized { + db.client.log.recordLogTestSupport.writeCall.threads.get() + } + + var suspendDelete = false; + + override def setSuspendDelete(value: Boolean): Unit = this.synchronized { + if( suspendDelete!=value ) { + suspendDelete = value; + if( suspendDelete ) { + db.client.log.recordLogTestSupport.deleteCall.suspend + } else { + db.client.log.recordLogTestSupport.deleteCall.resume + } + } + } + + override def getSuspendDelete: Boolean = this.synchronized { + suspendDelete + } + + override def getDeleteCalls = this.synchronized { + db.client.log.recordLogTestSupport.deleteCall.threads.get() + } + +} + class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean { import store._ @@ -223,6 +292,10 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P if(brokerService!=null && brokerService.isUseJmx){ try { AnnotatedMBean.registerMBean(brokerService.getManagementContext, new LevelDBStoreView(this), objectName) + if( java.lang.Boolean.getBoolean("org.apache.activemq.leveldb.test") ) { + val name = new ObjectName(objectName.toString + ",test=test") + AnnotatedMBean.registerMBean(brokerService.getManagementContext, new LevelDBStoreTest(this), name) + } } catch { case e: Throwable => { warn(e, "LevelDB Store could not be registered in JMX: " + e.getMessage) @@ -279,6 +352,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P db.stop if(brokerService!=null && brokerService.isUseJmx){ brokerService.getManagementContext().unregisterMBean(objectName); + if( java.lang.Boolean.getBoolean("org.apache.activemq.leveldb.test") ) + brokerService.getManagementContext().unregisterMBean(new ObjectName(objectName.toString+",test=test")); } info("Stopped "+this) } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala index daef103088..1ab66cee1f 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala @@ -22,7 +22,7 @@ import java.{util=>ju} import java.util.zip.CRC32 import java.util.Map.Entry -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import java.io._ import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer} import org.fusesource.hawtdispatch.BaseRetained @@ -31,6 +31,8 @@ import org.apache.activemq.util.LRUCache import util.TimeMetric._ import util.{TimeMetric, Log} import java.util.TreeMap +import java.util.concurrent.locks.{ReentrantReadWriteLock, ReadWriteLock} +import java.util.concurrent.CountDownLatch object RecordLog extends Log { @@ -68,6 +70,63 @@ object RecordLog extends Log { } +class SuspendCallSupport { + + val lock = new ReentrantReadWriteLock() + var resumeLatch:CountDownLatch = _ + var resumedLatch:CountDownLatch = _ + @volatile + var threads = new AtomicInteger() + + def suspend = this.synchronized { + val suspended = new CountDownLatch(1) + resumeLatch = new CountDownLatch(1) + resumedLatch = new CountDownLatch(1) + new Thread("Suspend Lock") { + override def run = { + try { + lock.writeLock().lock() + suspended.countDown() + resumeLatch.await() + } finally { + lock.writeLock().unlock(); + resumedLatch.countDown() + } + } + }.start() + suspended.await() + } + + def resume = this.synchronized { + if( resumedLatch != null ) { + resumeLatch.countDown() + resumedLatch.await(); + resumeLatch = null + resumedLatch = null + } + } + + def call[T](func: =>T):T= { + threads.incrementAndGet() + lock.readLock().lock() + try { + func + } finally { + threads.decrementAndGet() + lock.readLock().unlock() + } + } + +} + +class RecordLogTestSupport { + + val forceCall = new SuspendCallSupport() + val writeCall = new SuspendCallSupport() + val deleteCall = new SuspendCallSupport() + +} + case class RecordLog(directory: File, logSuffix:String) { import RecordLog._ @@ -78,6 +137,14 @@ case class RecordLog(directory: File, logSuffix:String) { var verify_checksums = false val log_infos = new TreeMap[Long, LogInfo]() + var recordLogTestSupport:RecordLogTestSupport = + if( java.lang.Boolean.getBoolean("org.apache.activemq.leveldb.test") ) { + new RecordLogTestSupport() + } else { + null + } + + object log_mutex def delete(id:Long) = { @@ -97,7 +164,13 @@ case class RecordLog(directory: File, logSuffix:String) { } protected def onDelete(file:File) = { - file.delete() + if( recordLogTestSupport!=null ) { + recordLogTestSupport.deleteCall.call { + file.delete() + } + } else { + file.delete() + } } def checksum(data: Buffer): Int = { @@ -137,10 +210,17 @@ case class RecordLog(directory: File, logSuffix:String) { flush max_log_flush_latency { // only need to update the file metadata if the file size changes.. - channel.force(append_offset > logSize) + if( recordLogTestSupport!=null ) { + recordLogTestSupport.forceCall.call { + channel.force(append_offset > logSize) + } + } else { + channel.force(append_offset > logSize) + } } } + def skip(length:Long) = this.synchronized { flush append_offset += length @@ -177,7 +257,15 @@ case class RecordLog(directory: File, logSuffix:String) { val buffer = data.toByteBuffer val pos = append_offset+LOG_HEADER_SIZE val remaining = buffer.remaining - channel.write(buffer, pos) + + if( recordLogTestSupport!=null ) { + recordLogTestSupport.writeCall.call { + channel.write(buffer, pos) + } + } else { + channel.write(buffer, pos) + } + flushed_offset.addAndGet(remaining) if( buffer.hasRemaining ) { throw new IOException("Short write") @@ -200,7 +288,15 @@ case class RecordLog(directory: File, logSuffix:String) { val buffer = write_buffer.toBuffer.toByteBuffer val remaining = buffer.remaining val pos = append_offset-remaining - channel.write(buffer, pos) + + if( recordLogTestSupport!=null ) { + recordLogTestSupport.writeCall.call { + channel.write(buffer, pos) + } + } else { + channel.write(buffer, pos) + } + flushed_offset.addAndGet(remaining) if( buffer.hasRemaining ) { throw new IOException("Short write")