Get the replicatedLevelDB element working in the activemq standalone config.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1477709 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-04-30 16:11:52 +00:00
parent 5cbb4bfc12
commit e507461dbc
7 changed files with 85 additions and 22 deletions

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.leveldb;
import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
/**
* An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
* LevelDB - Embedded Lightweight Non-Relational Database
*
* @org.apache.xbean.XBean element="replicatedLevelDB"
*
*/
public class ReplicatedLevelDBPersistenceAdapter extends ElectingLevelDBStore {
}

View File

@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.activemq.leveldb.util.Log
import java.io.File
import org.apache.activemq.usage.SystemUsage
object ElectingLevelDBStore extends Log {
@ -58,7 +59,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
def proxy_target = master
@BeanProperty
var zkAddress = "tcp://127.0.0.1:2888"
var zkAddress = "127.0.0.1:2181"
@BeanProperty
var zkPassword:String = _
@BeanProperty
@ -72,8 +73,12 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
var hostname: String = _
@BeanProperty
var bind = "tcp://0.0.0.0:61619"
@BeanProperty
var minReplica = 1
var replicas = 2
def clusterSizeQuorum = (replicas/2) + 1
@BeanProperty
var securityToken = ""
@ -116,10 +121,6 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
@BeanProperty
var monitorStats = false
def cluster_size_quorum = minReplica + 1
def cluster_size_max = (minReplica << 2) + 1
var master: MasterLevelDBStore = _
var slave: SlaveLevelDBStore = _
@ -129,6 +130,11 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
var position: Long = -1L
var usageManager: SystemUsage = _
override def setUsageManager(usageManager: SystemUsage) {
this.usageManager = usageManager
}
def init() {
// Figure out our position in the store.
@ -250,7 +256,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
def create_master() = {
val master = new MasterLevelDBStore
configure(master)
master.minReplica = minReplica
master.replicas = replicas
master.bind = bind
master
}
@ -278,6 +284,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
store.securityToken = securityToken
store.setBrokerName(brokerName)
store.setBrokerService(brokerService)
store.setUsageManager(usageManager)
}
def address(port: Int) = {
@ -287,4 +294,21 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
"tcp://" + hostname + ":" + port
}
override def size: Long = {
if( master !=null ) {
master.size
} else if( slave !=null ) {
slave.size
} else {
var rc = 0L
if( directory.exists() ) {
for( f <- directory.list() ) {
if( f.endsWith(LevelDBClient.LOG_SUFFIX)) {
rc += f.length
}
}
}
rc
}
}
}

View File

@ -90,7 +90,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
info("Not enough cluster members connected to elect a new master.")
case Some(members) =>
if (members.size < store.cluster_size_quorum) {
if (members.size < store.clusterSizeQuorum) {
info("Not enough cluster members connected to elect a new master.")
} else {

View File

@ -43,8 +43,10 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
@BeanProperty
var bind = "tcp://0.0.0.0:61619"
@BeanProperty
var minReplica = 1
var replicas = 2
def minSlaveAcks = replicas/2
val slaves = new ConcurrentHashMap[String,SlaveState]()
@ -288,10 +290,10 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
var position_sync = new PositionSync(0L, 0)
def wal_sync_to(position:Long):Unit = {
if( minReplica<1 ) {
if( minSlaveAcks<1 ) {
return
}
val position_sync = new PositionSync(position, minReplica)
val position_sync = new PositionSync(position, minSlaveAcks)
this.position_sync = position_sync
for( slave <- slaves.values() ) {
slave.check_position_sync
@ -299,7 +301,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
while( !position_sync.await(1, TimeUnit.SECONDS) ) {
val status = slaves.values().map(_.status).mkString(", ")
warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minReplica, position, status)
warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minSlaveAcks, position, status)
}
}

View File

@ -215,7 +215,7 @@ public class ElectingLevelDBStoreTest extends TestCase {
ElectingLevelDBStore store = new ElectingLevelDBStore();
store.setSecurityToken("foo");
store.setLogSize(1023 * 200);
store.setMinReplica(1);
store.setReplicas(2);
store.setZkAddress("localhost:" + connector.getLocalPort());
store.setZkPath("/broker-stores");
store.setBrokerName("foo");

View File

@ -17,24 +17,17 @@
package org.apache.activemq.leveldb.test;
import junit.framework.TestCase;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.leveldb.CountDownFuture;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.leveldb.replicated.MasterLevelDBStore;
import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore;
import org.apache.activemq.leveldb.util.FileSupport;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
@ -53,7 +46,7 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
FileSupport.toRichFile(slaveDir).recursiveDelete();
MasterLevelDBStore master = createMaster(masterDir);
master.setMinReplica(1);
master.setReplicas(2);
master.start();
MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
@ -172,7 +165,7 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
master.setDirectory(directory);
master.setBind("tcp://0.0.0.0:0");
master.setSecurityToken("foo");
master.setMinReplica(1);
master.setReplicas(2);
master.setLogSize(1023 * 200);
return master;
}

View File

@ -194,6 +194,20 @@
<include>org.apache.qpid:proton</include>
<include>org.apache.qpid:proton-api</include>
<include>org.apache.qpid:proton-jms</include>
<!-- activemq-leveldb-store dependencies -->
<include>org.scala-lang:scala-library</include>
<include>org.fusesource.hawtbuf:hawtbuf-proto</include>
<include>org.fusesource.hawtdispatch:*</include>
<include>org.iq80.leveldb:*</include>
<include>com.google.guava:guava</include>
<include>org.fusesource.leveldbjni:*</include>
<include>org.fusesource.hawtjni:hawtjni-runtime</include>
<include>org.xerial.snappy:*</include>
<include>org.iq80.snappy:*</include>
<include>org.codehaus.jackson:*</include>
<include>org.fusesource.fabric:*</include>
</includes>
</dependencySet>
<dependencySet>