From e507461dbc96e471f2bbea51d9fecbe45a7fd36e Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 30 Apr 2013 16:11:52 +0000 Subject: [PATCH] Get the replicatedLevelDB element working in the activemq standalone config. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1477709 13f79535-47bb-0310-9956-ffa450edef68 --- .../ReplicatedLevelDBPersistenceAdapter.java | 30 +++++++++++++++ .../replicated/ElectingLevelDBStore.scala | 38 +++++++++++++++---- .../leveldb/replicated/MasterElector.scala | 2 +- .../replicated/MasterLevelDBStore.scala | 10 +++-- .../test/ElectingLevelDBStoreTest.java | 2 +- .../test/ReplicatedLevelDBStoreTest.java | 11 +----- assembly/src/main/descriptors/common-bin.xml | 14 +++++++ 7 files changed, 85 insertions(+), 22 deletions(-) create mode 100644 activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java b/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java new file mode 100644 index 0000000000..e87e3286b7 --- /dev/null +++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.leveldb; + +import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore; + + +/** + * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with + * LevelDB - Embedded Lightweight Non-Relational Database + * + * @org.apache.xbean.XBean element="replicatedLevelDB" + * + */ +public class ReplicatedLevelDBPersistenceAdapter extends ElectingLevelDBStore { +} diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala index 261d5a9306..12eacb0442 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean import org.apache.activemq.leveldb.util.Log import java.io.File +import org.apache.activemq.usage.SystemUsage object ElectingLevelDBStore extends Log { @@ -58,7 +59,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore { def proxy_target = master @BeanProperty - var zkAddress = "tcp://127.0.0.1:2888" + var zkAddress = "127.0.0.1:2181" @BeanProperty var zkPassword:String = _ @BeanProperty @@ -72,8 +73,12 @@ class ElectingLevelDBStore extends ProxyLevelDBStore { var hostname: String = _ @BeanProperty var bind = "tcp://0.0.0.0:61619" + @BeanProperty - var minReplica = 1 + var replicas = 2 + + def clusterSizeQuorum = (replicas/2) + 1 + @BeanProperty var securityToken = "" @@ -116,10 +121,6 @@ class ElectingLevelDBStore extends ProxyLevelDBStore { @BeanProperty var monitorStats = false - def cluster_size_quorum = minReplica + 1 - - def cluster_size_max = (minReplica << 2) + 1 - var master: MasterLevelDBStore = _ var slave: SlaveLevelDBStore = _ @@ -129,6 +130,11 @@ class ElectingLevelDBStore extends ProxyLevelDBStore { var position: Long = -1L + var usageManager: SystemUsage = _ + override def setUsageManager(usageManager: SystemUsage) { + this.usageManager = usageManager + } + def init() { // Figure out our position in the store. @@ -250,7 +256,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore { def create_master() = { val master = new MasterLevelDBStore configure(master) - master.minReplica = minReplica + master.replicas = replicas master.bind = bind master } @@ -278,6 +284,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore { store.securityToken = securityToken store.setBrokerName(brokerName) store.setBrokerService(brokerService) + store.setUsageManager(usageManager) } def address(port: Int) = { @@ -287,4 +294,21 @@ class ElectingLevelDBStore extends ProxyLevelDBStore { "tcp://" + hostname + ":" + port } + override def size: Long = { + if( master !=null ) { + master.size + } else if( slave !=null ) { + slave.size + } else { + var rc = 0L + if( directory.exists() ) { + for( f <- directory.list() ) { + if( f.endsWith(LevelDBClient.LOG_SUFFIX)) { + rc += f.length + } + } + } + rc + } + } } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala index d7552dcd45..a5cef1b4cb 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala @@ -90,7 +90,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve info("Not enough cluster members connected to elect a new master.") case Some(members) => - if (members.size < store.cluster_size_quorum) { + if (members.size < store.clusterSizeQuorum) { info("Not enough cluster members connected to elect a new master.") } else { diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala index f0aa89e9f8..fa97f23005 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala @@ -43,8 +43,10 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { @BeanProperty var bind = "tcp://0.0.0.0:61619" + @BeanProperty - var minReplica = 1 + var replicas = 2 + def minSlaveAcks = replicas/2 val slaves = new ConcurrentHashMap[String,SlaveState]() @@ -288,10 +290,10 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { var position_sync = new PositionSync(0L, 0) def wal_sync_to(position:Long):Unit = { - if( minReplica<1 ) { + if( minSlaveAcks<1 ) { return } - val position_sync = new PositionSync(position, minReplica) + val position_sync = new PositionSync(position, minSlaveAcks) this.position_sync = position_sync for( slave <- slaves.values() ) { slave.check_position_sync @@ -299,7 +301,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { while( !position_sync.await(1, TimeUnit.SECONDS) ) { val status = slaves.values().map(_.status).mkString(", ") - warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minReplica, position, status) + warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minSlaveAcks, position, status) } } diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java index 73efb312c6..c8a7df49d9 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java @@ -215,7 +215,7 @@ public class ElectingLevelDBStoreTest extends TestCase { ElectingLevelDBStore store = new ElectingLevelDBStore(); store.setSecurityToken("foo"); store.setLogSize(1023 * 200); - store.setMinReplica(1); + store.setReplicas(2); store.setZkAddress("localhost:" + connector.getLocalPort()); store.setZkPath("/broker-stores"); store.setBrokerName("foo"); diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java index be06bec31f..84ac7ba650 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java @@ -17,24 +17,17 @@ package org.apache.activemq.leveldb.test; import junit.framework.TestCase; -import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; import org.apache.activemq.leveldb.CountDownFuture; import org.apache.activemq.leveldb.LevelDBStore; import org.apache.activemq.leveldb.replicated.MasterLevelDBStore; import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore; import org.apache.activemq.leveldb.util.FileSupport; -import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.JMSException; import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.concurrent.TimeUnit; @@ -53,7 +46,7 @@ public class ReplicatedLevelDBStoreTest extends TestCase { FileSupport.toRichFile(slaveDir).recursiveDelete(); MasterLevelDBStore master = createMaster(masterDir); - master.setMinReplica(1); + master.setReplicas(2); master.start(); MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST")); @@ -172,7 +165,7 @@ public class ReplicatedLevelDBStoreTest extends TestCase { master.setDirectory(directory); master.setBind("tcp://0.0.0.0:0"); master.setSecurityToken("foo"); - master.setMinReplica(1); + master.setReplicas(2); master.setLogSize(1023 * 200); return master; } diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml index c4bf0ee6f6..8d525cf0fb 100644 --- a/assembly/src/main/descriptors/common-bin.xml +++ b/assembly/src/main/descriptors/common-bin.xml @@ -194,6 +194,20 @@ org.apache.qpid:proton org.apache.qpid:proton-api org.apache.qpid:proton-jms + + + org.scala-lang:scala-library + org.fusesource.hawtbuf:hawtbuf-proto + org.fusesource.hawtdispatch:* + org.iq80.leveldb:* + com.google.guava:guava + org.fusesource.leveldbjni:* + org.fusesource.hawtjni:hawtjni-runtime + org.xerial.snappy:* + org.iq80.snappy:* + org.codehaus.jackson:* + org.fusesource.fabric:* +