This commit is contained in:
Hiram Chirino 2014-11-25 11:40:48 -05:00
parent 74f530a641
commit ebafd5c193
3 changed files with 232 additions and 5 deletions

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.leveldb;
import org.apache.activemq.broker.jmx.MBeanInfo;
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface LevelDBStoreTestMBean {
@MBeanInfo("Used to set if the log force calls should be suspended")
void setSuspendForce(boolean value);
@MBeanInfo("Gets if the log force calls should be suspended")
boolean getSuspendForce();
@MBeanInfo("Gets the number of threads waiting to do a log force call.")
long getForceCalls();
@MBeanInfo("Used to set if the log write calls should be suspended")
void setSuspendWrite(boolean value);
@MBeanInfo("Gets if the log write calls should be suspended")
boolean getSuspendWrite();
@MBeanInfo("Gets the number of threads waiting to do a log write call.")
long getWriteCalls();
@MBeanInfo("Used to set if the log delete calls should be suspended")
void setSuspendDelete(boolean value);
@MBeanInfo("Gets if the log delete calls should be suspended")
boolean getSuspendDelete();
@MBeanInfo("Gets the number of threads waiting to do a log delete call.")
long getDeleteCalls();
}

View File

@ -37,6 +37,7 @@ import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
import org.fusesource.hawtdispatch;
import org.apache.activemq.broker.scheduler.JobSchedulerStore
import org.apache.activemq.store.IndexListener.MessageContext
import javax.management.ObjectName
object LevelDBStore extends Log {
val DEFAULT_DIRECTORY = new File("LevelDB");
@ -82,6 +83,74 @@ case class DurableSubscription(subKey:Long, topicKey:Long, info: SubscriptionInf
var cursorPosition = 0L
}
class LevelDBStoreTest(val store:LevelDBStore) extends LevelDBStoreTestMBean {
import store._
var suspendForce = false;
override def setSuspendForce(value: Boolean): Unit = this.synchronized {
if( suspendForce!=value ) {
suspendForce = value;
if( suspendForce ) {
db.client.log.recordLogTestSupport.forceCall.suspend
} else {
db.client.log.recordLogTestSupport.forceCall.resume
}
}
}
override def getSuspendForce: Boolean = this.synchronized {
suspendForce
}
override def getForceCalls = this.synchronized {
db.client.log.recordLogTestSupport.forceCall.threads.get()
}
var suspendWrite = false;
override def setSuspendWrite(value: Boolean): Unit = this.synchronized {
if( suspendWrite!=value ) {
suspendWrite = value;
if( suspendWrite ) {
db.client.log.recordLogTestSupport.writeCall.suspend
} else {
db.client.log.recordLogTestSupport.writeCall.resume
}
}
}
override def getSuspendWrite: Boolean = this.synchronized {
suspendWrite
}
override def getWriteCalls = this.synchronized {
db.client.log.recordLogTestSupport.writeCall.threads.get()
}
var suspendDelete = false;
override def setSuspendDelete(value: Boolean): Unit = this.synchronized {
if( suspendDelete!=value ) {
suspendDelete = value;
if( suspendDelete ) {
db.client.log.recordLogTestSupport.deleteCall.suspend
} else {
db.client.log.recordLogTestSupport.deleteCall.resume
}
}
}
override def getSuspendDelete: Boolean = this.synchronized {
suspendDelete
}
override def getDeleteCalls = this.synchronized {
db.client.log.recordLogTestSupport.deleteCall.threads.get()
}
}
class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean {
import store._
@ -223,6 +292,10 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
if(brokerService!=null && brokerService.isUseJmx){
try {
AnnotatedMBean.registerMBean(brokerService.getManagementContext, new LevelDBStoreView(this), objectName)
if( java.lang.Boolean.getBoolean("org.apache.activemq.leveldb.test") ) {
val name = new ObjectName(objectName.toString + ",test=test")
AnnotatedMBean.registerMBean(brokerService.getManagementContext, new LevelDBStoreTest(this), name)
}
} catch {
case e: Throwable => {
warn(e, "LevelDB Store could not be registered in JMX: " + e.getMessage)
@ -279,6 +352,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
db.stop
if(brokerService!=null && brokerService.isUseJmx){
brokerService.getManagementContext().unregisterMBean(objectName);
if( java.lang.Boolean.getBoolean("org.apache.activemq.leveldb.test") )
brokerService.getManagementContext().unregisterMBean(new ObjectName(objectName.toString+",test=test"));
}
info("Stopped "+this)
}

View File

@ -22,7 +22,7 @@ import java.{util=>ju}
import java.util.zip.CRC32
import java.util.Map.Entry
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.io._
import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
import org.fusesource.hawtdispatch.BaseRetained
@ -31,6 +31,8 @@ import org.apache.activemq.util.LRUCache
import util.TimeMetric._
import util.{TimeMetric, Log}
import java.util.TreeMap
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReadWriteLock}
import java.util.concurrent.CountDownLatch
object RecordLog extends Log {
@ -68,6 +70,63 @@ object RecordLog extends Log {
}
class SuspendCallSupport {
val lock = new ReentrantReadWriteLock()
var resumeLatch:CountDownLatch = _
var resumedLatch:CountDownLatch = _
@volatile
var threads = new AtomicInteger()
def suspend = this.synchronized {
val suspended = new CountDownLatch(1)
resumeLatch = new CountDownLatch(1)
resumedLatch = new CountDownLatch(1)
new Thread("Suspend Lock") {
override def run = {
try {
lock.writeLock().lock()
suspended.countDown()
resumeLatch.await()
} finally {
lock.writeLock().unlock();
resumedLatch.countDown()
}
}
}.start()
suspended.await()
}
def resume = this.synchronized {
if( resumedLatch != null ) {
resumeLatch.countDown()
resumedLatch.await();
resumeLatch = null
resumedLatch = null
}
}
def call[T](func: =>T):T= {
threads.incrementAndGet()
lock.readLock().lock()
try {
func
} finally {
threads.decrementAndGet()
lock.readLock().unlock()
}
}
}
class RecordLogTestSupport {
val forceCall = new SuspendCallSupport()
val writeCall = new SuspendCallSupport()
val deleteCall = new SuspendCallSupport()
}
case class RecordLog(directory: File, logSuffix:String) {
import RecordLog._
@ -78,6 +137,14 @@ case class RecordLog(directory: File, logSuffix:String) {
var verify_checksums = false
val log_infos = new TreeMap[Long, LogInfo]()
var recordLogTestSupport:RecordLogTestSupport =
if( java.lang.Boolean.getBoolean("org.apache.activemq.leveldb.test") ) {
new RecordLogTestSupport()
} else {
null
}
object log_mutex
def delete(id:Long) = {
@ -97,8 +164,14 @@ case class RecordLog(directory: File, logSuffix:String) {
}
protected def onDelete(file:File) = {
if( recordLogTestSupport!=null ) {
recordLogTestSupport.deleteCall.call {
file.delete()
}
} else {
file.delete()
}
}
def checksum(data: Buffer): Int = {
val checksum = new CRC32
@ -137,9 +210,16 @@ case class RecordLog(directory: File, logSuffix:String) {
flush
max_log_flush_latency {
// only need to update the file metadata if the file size changes..
if( recordLogTestSupport!=null ) {
recordLogTestSupport.forceCall.call {
channel.force(append_offset > logSize)
}
} else {
channel.force(append_offset > logSize)
}
}
}
def skip(length:Long) = this.synchronized {
flush
@ -177,7 +257,15 @@ case class RecordLog(directory: File, logSuffix:String) {
val buffer = data.toByteBuffer
val pos = append_offset+LOG_HEADER_SIZE
val remaining = buffer.remaining
if( recordLogTestSupport!=null ) {
recordLogTestSupport.writeCall.call {
channel.write(buffer, pos)
}
} else {
channel.write(buffer, pos)
}
flushed_offset.addAndGet(remaining)
if( buffer.hasRemaining ) {
throw new IOException("Short write")
@ -200,7 +288,15 @@ case class RecordLog(directory: File, logSuffix:String) {
val buffer = write_buffer.toBuffer.toByteBuffer
val remaining = buffer.remaining
val pos = append_offset-remaining
if( recordLogTestSupport!=null ) {
recordLogTestSupport.writeCall.call {
channel.write(buffer, pos)
}
} else {
channel.write(buffer, pos)
}
flushed_offset.addAndGet(remaining)
if( buffer.hasRemaining ) {
throw new IOException("Short write")