diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 8f6561533a..3d6178bcea 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -880,7 +880,7 @@ class LevelDBClient(store: LevelDBStore) { index.put(key, baos.toByteArray) } catch { - case e => throw e + case e : Throwable => throw e } } def storeList[T <: AnyRef](key:Array[Byte], list:Array[Long]) { @@ -895,7 +895,7 @@ class LevelDBClient(store: LevelDBStore) { index.put(key, baos.toByteArray) } catch { - case e => throw e + case e : Throwable => throw e } } def storeObject(key:Array[Byte], o:Object) = { @@ -1657,7 +1657,6 @@ class LevelDBClient(store: LevelDBStore) { } import collection.JavaConversions._ - lastIndexSnapshotPos // drop the logs that are no longer referenced. for( (x,y) <- logRefs.toSeq ) { diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 01d517007f..04eaf16730 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -314,7 +314,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P return 0 } - def createTransactionStore = this + def createTransactionStore = new LevelDBTransactionStore(this) val transactions = new ConcurrentHashMap[TransactionId, Transaction]() @@ -1021,6 +1021,20 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } + class LevelDBTransactionStore(val store:LevelDBStore) extends TransactionStore { + def start() = {} + + def stop() = {} + + def prepare(txid: TransactionId) = store.prepare(txid) + + def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = store.commit(txid, wasPrepared, preCommit, postCommit) + + def rollback(txid: TransactionId) = store.rollback(txid) + + def recover(listener: TransactionRecoveryListener) = store.recover(listener) + } + /////////////////////////////////////////////////////////////////////////// // The following methods actually have nothing to do with JMS txs... It's more like // operation batch.. we handle that in the DBManager tho.. diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala index 5d66abaa54..87209878ab 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala @@ -132,27 +132,27 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } def stop_connections(cb:Task) = { - var then = ^{ + var task = ^{ unstash(directory) cb.run() } val wal_session_copy = wal_session if( wal_session_copy !=null ) { wal_session = null - val next = then - then = ^{ + val next = task + task = ^{ wal_session_copy.transport.stop(next) } } val transfer_session_copy = transfer_session if( transfer_session_copy !=null ) { transfer_session = null - val next = then - then = ^{ + val next = task + task = ^{ transfer_session_copy.transport.stop(next) } } - then.run(); + task.run(); } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala index a1a5250d31..a66d7b3153 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala @@ -27,6 +27,7 @@ import java.lang.{IllegalStateException, String} import beans.BeanProperty import com.fasterxml.jackson.annotation.JsonProperty import org.apache.zookeeper.KeeperException.NoNodeException +import scala.reflect.ClassTag /** * @author Hiram Chirino @@ -162,7 +163,7 @@ class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T]) extends } def masters = this.synchronized { - _members.mapValues(_.head._2).toArray.map(_._2).toArray(new ClassManifest[T] { + _members.mapValues(_.head._2).toArray.map(_._2).toArray(new ClassTag[T] { def runtimeClass = stateClass override def erasure = stateClass }) diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala index 80d0a25910..b004c8c994 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala @@ -252,7 +252,7 @@ object ProcessSupport { FileSupport.copy(in, out) } } catch { - case _ => + case _ : Throwable => } } } else { @@ -266,7 +266,7 @@ object ProcessSupport { FileSupport.copy(in, out) } } catch { - case _ => + case _ : Throwable => } } } else { @@ -280,7 +280,7 @@ object ProcessSupport { FileSupport.copy(in, err) } } catch { - case _ => + case _ : Throwable => } } } else { diff --git a/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/PListTest.java b/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/PListTest.java new file mode 100644 index 0000000000..e0c7609f43 --- /dev/null +++ b/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/PListTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.leveldb.test; + + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.leveldb.LevelDBStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.File; + +public class PListTest { + + protected BrokerService brokerService; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.addConnector("tcp://localhost:0"); + + LevelDBStore store = new LevelDBStore(); + store.setDirectory(new File("target/activemq-data/haleveldb")); + store.deleteAllMessages(); + brokerService.setPersistenceAdapter(store); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setMemoryLimit(1); + policyMap.setDefaultEntry(policy); + brokerService.setDestinationPolicy(policyMap); + + brokerService.start(); + } + + @After + public void tearDown() throws Exception { + if (brokerService != null && brokerService.isStopped()) { + brokerService.stop(); + } + } + + @Test + public void testBrokerStop() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getServer().getConnectURI().toString()); + Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sess.createProducer(sess.createQueue("TEST")); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < 10000; i++) { + producer.send(sess.createTextMessage(i + " message")); + } + brokerService.stop(); + brokerService.waitUntilStopped(); + } + +}