Moving stuff around so that activemq-core can depend on kahadb instead of kahadb depending on activemq-core. This step allows kahadb to be used as the default

store implementation if desired.  Will do that next.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@731704 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-01-05 20:48:38 +00:00
parent 4199c95098
commit f62737be7c
42 changed files with 110 additions and 4352 deletions

View File

@ -77,7 +77,17 @@
<optional>false</optional>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>kahadb</artifactId>
<optional>false</optional>
</dependency>
<dependency>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
<optional>false</optional>
</dependency>
<!-- =============================== -->
<!-- Optional Dependencies -->
<!-- =============================== -->
@ -457,6 +467,18 @@
</configuration>
</plugin>
<plugin>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>

View File

@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store;
package org.apache.activemq.store.kahadb;
import java.io.IOException;
import org.apache.kahadb.store.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
public interface JournalCommand<T> extends org.apache.activemq.protobuf.Message<T> {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store;
package org.apache.activemq.store.kahadb;
import java.io.DataInputStream;
import java.io.IOException;
@ -47,24 +47,24 @@ import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaCommitCommand;
import org.apache.kahadb.store.data.KahaDestination;
import org.apache.kahadb.store.data.KahaLocalTransactionId;
import org.apache.kahadb.store.data.KahaLocation;
import org.apache.kahadb.store.data.KahaPrepareCommand;
import org.apache.kahadb.store.data.KahaRemoveDestinationCommand;
import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
import org.apache.kahadb.store.data.KahaRollbackCommand;
import org.apache.kahadb.store.data.KahaSubscriptionCommand;
import org.apache.kahadb.store.data.KahaTransactionInfo;
import org.apache.kahadb.store.data.KahaXATransactionId;
import org.apache.kahadb.store.data.KahaDestination.DestinationType;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store;
package org.apache.activemq.store.kahadb;
import java.io.DataInput;
import java.io.DataOutput;
@ -38,6 +38,19 @@ import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.util.Callback;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -48,19 +61,6 @@ import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaCommitCommand;
import org.apache.kahadb.store.data.KahaDestination;
import org.apache.kahadb.store.data.KahaEntryType;
import org.apache.kahadb.store.data.KahaLocalTransactionId;
import org.apache.kahadb.store.data.KahaPrepareCommand;
import org.apache.kahadb.store.data.KahaRemoveDestinationCommand;
import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
import org.apache.kahadb.store.data.KahaRollbackCommand;
import org.apache.kahadb.store.data.KahaSubscriptionCommand;
import org.apache.kahadb.store.data.KahaTraceCommand;
import org.apache.kahadb.store.data.KahaTransactionInfo;
import org.apache.kahadb.store.data.KahaXATransactionId;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;

View File

@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store;
package org.apache.activemq.store.kahadb;
import java.io.IOException;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaCommitCommand;
import org.apache.kahadb.store.data.KahaPrepareCommand;
import org.apache.kahadb.store.data.KahaRemoveDestinationCommand;
import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
import org.apache.kahadb.store.data.KahaRollbackCommand;
import org.apache.kahadb.store.data.KahaTraceCommand;
import org.apache.kahadb.store.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
public class Visitor {

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.apache.kahadb.store.data;
package org.apache.activemq.store.kahadb.data;
option java_multiple_files = true;
option java_outer_classname = "JournalData";
@ -36,16 +36,16 @@ message KahaTraceCommand {
// are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
// In the ActiveMQ proto compiler, comments terminate with the pipe character: |
//| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaTraceCommand>";
//| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaTraceCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required string message = 1;
}
message KahaAddMessageCommand {
//| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaAddMessageCommand>";
//| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddMessageCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
optional KahaTransactionInfo transaction_info=1;
@ -55,8 +55,8 @@ message KahaAddMessageCommand {
}
message KahaRemoveMessageCommand {
//| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaRemoveMessageCommand>";
//| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveMessageCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
optional KahaTransactionInfo transaction_info=1;
@ -67,40 +67,40 @@ message KahaRemoveMessageCommand {
}
message KahaPrepareCommand {
//| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaPrepareCommand>";
//| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaPrepareCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required KahaTransactionInfo transaction_info=1;
}
message KahaCommitCommand {
//| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaCommitCommand>";
//| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaCommitCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required KahaTransactionInfo transaction_info=1;
}
message KahaRollbackCommand {
//| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaRollbackCommand>";
//| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRollbackCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required KahaTransactionInfo transaction_info=1;
}
message KahaRemoveDestinationCommand {
//| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaRemoveDestinationCommand>";
//| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveDestinationCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required KahaDestination destination = 1;
}
message KahaSubscriptionCommand {
//| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaSubscriptionCommand>";
//| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaSubscriptionCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required KahaDestination destination = 1;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store;
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.net.URI;
@ -28,7 +28,7 @@ import org.apache.activemq.broker.BrokerTest;
/**
* Once the wire format is completed we can test against real persistence storage.
*
* @version $Revision$
* @version $Revision: 712224 $
*/
public class KahaDBStoreBrokerTest extends BrokerTest {

View File

@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store;
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.RecoveryBrokerTest;
import org.apache.activemq.broker.StubConnection;
@ -37,7 +39,7 @@ import org.apache.activemq.command.SessionInfo;
/**
* Used to verify that recovery works correctly against
*
* @version $Revision$
* @version $Revision: 712224 $
*/
public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store;
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.net.URI;
@ -28,7 +28,7 @@ import org.apache.activemq.broker.XARecoveryBrokerTest;
/**
* Used to verify that recovery works correctly against
*
* @version $Revision$
* @version $Revision: 712224 $
*/
public class KahaDBStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store;
package org.apache.activemq.store.kahadb;
import java.io.IOException;
@ -22,10 +22,10 @@ import junit.framework.TestCase;
import org.apache.activemq.protobuf.Buffer;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaDestination;
import org.apache.kahadb.store.data.KahaEntryType;
import org.apache.kahadb.store.data.KahaDestination.DestinationType;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;

View File

@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store.perf;
package org.apache.activemq.store.kahadb.perf;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@ -33,17 +35,18 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProgressPrinter;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.store.KahaDBStore;
import org.apache.activemq.store.kahadb.KahaDBStore;
/**
* This tests bulk loading and unloading of messages to a Queue.s
*
* @version $Revision$
* @version $Revision: 712224 $
*/
public class KahaBulkLoadingTest extends JmsTestSupport {

View File

@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store.perf;
package org.apache.activemq.store.kahadb.perf;
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.perf.SimpleDurableTopicTest;
import org.apache.kahadb.store.KahaDBStore;
import org.apache.activemq.store.kahadb.KahaDBStore;
/**
* @version $Revision$
* @version $Revision: 712224 $
*/
public class KahaStoreDurableTopicTest extends SimpleDurableTopicTest {

View File

@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store.perf;
package org.apache.activemq.store.kahadb.perf;
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.perf.SimpleQueueTest;
import org.apache.kahadb.store.KahaDBStore;
import org.apache.activemq.store.kahadb.KahaDBStore;
/**
* @version $Revision$
* @version $Revision: 712224 $
*/
public class KahaStoreQueueTest extends SimpleQueueTest {

View File

@ -1,153 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core"
start="false"
brokerName="localhost" dataDirectory="${activemq.base}/data">
<!-- Destination specific policies using destination names or wildcards -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" memoryLimit="5mb"/>
<policyEntry topic=">" memoryLimit="5mb">
<!-- you can add other policies too such as these
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!-- Use the following to configure how ActiveMQ is exposed in JMX -->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!-- The store and forward broker networks ActiveMQ will listen to -->
<networkConnectors>
<!-- by default just auto discover the other brokers -->
<networkConnector name="default-nc" uri="multicast://default"/>
<!-- Example of a static configuration:
<networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
-->
</networkConnectors>
<sslContext>
<sslContext keyStore="file:${activemq.base}/conf/broker.ks" keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts" trustStorePassword="password"/>
</sslContext>
<!-- The maximum about of space the broker will use before slowing down producers -->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
</transportConnectors>
</broker>
<!--
** Lets deploy some Enterprise Integration Patterns inside the ActiveMQ Message Broker
** For more details see
**
** http://activemq.apache.org/enterprise-integration-patterns.html
-->
<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
<!-- You can use a <package> element for each root package to search for Java routes -->
<package>org.foo.bar</package>
<!-- You can use Spring XML syntax to define the routes here using the <route> element -->
<route>
<from uri="activemq:example.A"/>
<to uri="activemq:example.B"/>
</route>
</camelContext>
<!--
** Lets configure some Camel endpoints
**
** http://activemq.apache.org/camel/components.html
-->
<!-- configure the camel activemq component to use the current broker -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost?create=false&amp;waitForStart=10000" />
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${activemq.password}"/>
</bean>
</property>
</bean>
<!-- Uncomment to create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->
<!--
<commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://localhost" username="${activemq.username}" password="${activemq.password}"/>
-->
<!-- An embedded servlet engine for serving up the Admin console -->
<jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
<connectors>
<nioConnector port="8161"/>
</connectors>
<handlers>
<webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>
<webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>
<webAppContext contextPath="/fileserver" resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>
</handlers>
</jetty>
</beans>
<!-- END SNIPPET: example -->

View File

@ -1,53 +0,0 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:kdb="http://activemq.apache.org/schema/kahadb"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activ emq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/kahadb http://activemq.apache.org/schema/kahadb/kahadb.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
<replicationService>
<kahadbReplication
directory="${activemq.base}/data/kahadb"
brokerURI="xbean:ha-broker.xml"
uri="kdbr://localhost:60001"
minimumReplicas="1">
<cluster>
<zookeeperCluster uri="zk://localhost:2181/activemq/default-ha-group" userid="activemq" password=""/>
</cluster>
</kahadbReplication>
</replicationService>
</kahadbReplicationBroker>
</beans>
<!-- END SNIPPET: example -->

View File

@ -52,78 +52,27 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
<scope>compile</scope>
<scope>test</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.hadoop.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activeblaze</artifactId>
<version>1.0-SNAPSHOT</version>
<optional>true</optional>
</dependency>
</dependencies>
<repositories>
<repository>
<id>chirino-zk-repo</id>
<name>Private ZooKeeper Repo</name>
<url>http://people.apache.org/~chirino/zk-repo/</url>
</repository>
</repositories>
<build>
<plugins>
<!--
<plugin>
<groupId>org.apache.xbean</groupId>
<artifactId>maven-xbean-plugin</artifactId>
@ -139,6 +88,7 @@
</execution>
</executions>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
@ -147,17 +97,6 @@
<target>1.5</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>

View File

@ -1,23 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
public interface ClusterListener {
public void onClusterChange(ClusterState config);
}

View File

@ -1,40 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import java.util.ArrayList;
import java.util.List;
public class ClusterState {
private List<String> slaves = new ArrayList<String>();
private String master;
public List<String> getSlaves() {
return slaves;
}
public void setSlaves(List<String> slaves) {
this.slaves = slaves;
}
public String getMaster() {
return master;
}
public void setMaster(String master) {
this.master = master;
}
}

View File

@ -1,66 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import org.apache.activemq.Service;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
/**
* This interface is used by the ReplicationService to know when
* it should switch between Slave and Master mode.
*
* @author chirino
*/
public interface ClusterStateManager extends Service {
/**
* Adds a ClusterListener which is used to get notifications
* of chagnes in the cluster state.
* @param listener
*/
void addListener(ClusterListener listener);
/**
* Removes a previously added ClusterListener
* @param listener
*/
void removeListener(ClusterListener listener);
/**
* Adds a member to the cluster. Adding a member does not mean he is online.
* Some ClusterStateManager may keep track of a persistent memebership list
* so that can determine if there are enough nodes online to form a quorum
* for the purposes of electing a master.
*
* @param node
*/
public void addMember(final String node);
/**
* Removes a previously added member.
*
* @param node
*/
public void removeMember(final String node);
/**
* Updates the status of the local node.
*
* @param status
*/
public void setMemberStatus(final PBClusterNodeStatus status);
}

View File

@ -1,57 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.BrokerService;
/**
* This broker service actually does not do anything. It allows you to create an activemq.xml file
* which does not actually start a broker. Used in conjunction with the ReplicationService since
* he will create the actual BrokerService
*
* @author chirino
* @org.apache.xbean.XBean element="kahadbReplicationBroker"
*/
public class ReplicationBrokerService extends BrokerService {
ReplicationService replicationService;
AtomicBoolean started = new AtomicBoolean();
public ReplicationService getReplicationService() {
return replicationService;
}
public void setReplicationService(ReplicationService replicationService) {
this.replicationService = replicationService;
}
@Override
public void start() throws Exception {
if( started.compareAndSet(false, true) ) {
replicationService.start();
}
}
@Override
public void stop() throws Exception {
if( started.compareAndSet(true, false) ) {
replicationService.stop();
}
}
}

View File

@ -1,40 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import org.apache.kahadb.replication.pb.PBHeader;
public class ReplicationFrame {
PBHeader header;
Object payload;
public PBHeader getHeader() {
return header;
}
public void setHeader(PBHeader header) {
this.header = header;
}
public Object getPayload() {
return payload;
}
public void setPayload(Object payload) {
this.payload = payload;
}
}

View File

@ -1,487 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.util.Callback;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.journal.ReplicationTarget;
import org.apache.kahadb.replication.pb.PBFileInfo;
import org.apache.kahadb.replication.pb.PBHeader;
import org.apache.kahadb.replication.pb.PBJournalLocation;
import org.apache.kahadb.replication.pb.PBJournalUpdate;
import org.apache.kahadb.replication.pb.PBSlaveInit;
import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
import org.apache.kahadb.replication.pb.PBType;
import org.apache.kahadb.store.KahaDBStore;
import org.apache.kahadb.util.ByteSequence;
public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
private static final Log LOG = LogFactory.getLog(ReplicationService.class);
private final ReplicationService replicationService;
private Object serverMutex = new Object() {
};
private TransportServer server;
private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
private final AtomicInteger nextSnapshotId = new AtomicInteger();
private final Object requestMutex = new Object(){};
private Location requestLocation;
private CountDownLatch requestLatch;
private int minimumReplicas;
public ReplicationMaster(ReplicationService replicationService) {
this.replicationService = replicationService;
minimumReplicas = replicationService.getMinimumReplicas();
}
public void start() throws Exception {
synchronized (serverMutex) {
server = TransportFactory.bind(new URI(replicationService.getUri()));
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(Transport transport) {
try {
synchronized (serverMutex) {
ReplicationSession session = new ReplicationSession(transport);
session.start();
addSession(session);
}
} catch (Exception e) {
LOG.info("Could not accept replication connection from slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
}
}
public void onAcceptError(Exception e) {
LOG.info("Could not accept replication connection: " + e, e);
}
});
server.start();
}
replicationService.getStore().getJournal().setReplicationTarget(this);
}
boolean isStarted() {
synchronized (serverMutex) {
return server != null;
}
}
public void stop() throws Exception {
replicationService.getStore().getJournal().setReplicationTarget(null);
synchronized (serverMutex) {
if (server != null) {
server.stop();
server = null;
}
}
ArrayList<ReplicationSession> sessionsSnapshot;
synchronized (this.sessions) {
sessionsSnapshot = this.sessions;
}
for (ReplicationSession session : sessionsSnapshot) {
session.stop();
}
}
protected void addSession(ReplicationSession session) {
synchronized (sessions) {
sessions = new ArrayList<ReplicationSession>(sessions);
sessions.add(session);
}
}
protected void removeSession(ReplicationSession session) {
synchronized (sessions) {
sessions = new ArrayList<ReplicationSession>(sessions);
sessions.remove(session);
}
}
public void onClusterChange(ClusterState config) {
// For now, we don't really care about changes in the slave config..
}
/**
* This is called by the Journal so that we can replicate the update to the
* slaves.
*/
public void replicate(Location location, ByteSequence sequence, boolean sync) {
ArrayList<ReplicationSession> sessionsSnapshot;
synchronized (this.sessions) {
// Hurrah for copy on write..
sessionsSnapshot = this.sessions;
}
// We may be able to always async replicate...
if (minimumReplicas==0) {
sync = false;
}
CountDownLatch latch = null;
if (sync) {
latch = new CountDownLatch(minimumReplicas);
synchronized (requestMutex) {
requestLatch = latch;
requestLocation = location;
}
}
ReplicationFrame frame = null;
for (ReplicationSession session : sessionsSnapshot) {
if (session.subscribedToJournalUpdates.get()) {
// Lazy create the frame since we may have not avilable sessions
// to send to.
if (frame == null) {
frame = new ReplicationFrame();
frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
PBJournalUpdate payload = new PBJournalUpdate();
payload.setLocation(ReplicationSupport.convert(location));
payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
payload.setSendAck(sync);
frame.setPayload(payload);
}
// TODO: use async send threads so that the frames can be pushed
// out in parallel.
try {
session.setLastUpdateLocation(location);
session.transport.oneway(frame);
} catch (IOException e) {
session.onException(e);
}
}
}
if (sync) {
try {
int timeout = 500;
int counter = 0;
while (true) {
if (latch.await(timeout, TimeUnit.MILLISECONDS)) {
return;
}
if (!isStarted()) {
return;
}
counter++;
if ((counter % 10) == 0) {
LOG.warn("KahaDB is waiting for slave to come online. " + (timeout * counter / 1000.f) + " seconds have elapsed.");
}
}
} catch (InterruptedException ignore) {
}
}
}
private void ackAllFromTo(Location lastAck, Location newAck) {
Location l;
java.util.concurrent.CountDownLatch latch;
synchronized (requestMutex) {
latch = requestLatch;
l = requestLocation;
}
if( l == null ) {
return;
}
if (lastAck == null || lastAck.compareTo(l) < 0) {
if (newAck != null && l.compareTo(newAck) <= 0) {
latch.countDown();
return;
}
}
}
class ReplicationSession implements Service, TransportListener {
private final Transport transport;
private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
private boolean stopped;
private File snapshotFile;
private HashSet<Integer> journalReplicatedFiles;
private Location lastAckLocation;
private Location lastUpdateLocation;
private boolean online;
public ReplicationSession(Transport transport) {
this.transport = transport;
}
synchronized public void setLastUpdateLocation(Location lastUpdateLocation) {
this.lastUpdateLocation = lastUpdateLocation;
}
public void start() throws Exception {
transport.setTransportListener(this);
transport.start();
}
synchronized public void stop() throws Exception {
if (!stopped) {
stopped = true;
deleteReplicationData();
transport.stop();
}
}
synchronized private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation location) {
Location ack = ReplicationSupport.convert(location);
if (online) {
ackAllFromTo(lastAckLocation, ack);
}
lastAckLocation = ack;
}
synchronized private void onSlaveOnline(ReplicationFrame frame) {
deleteReplicationData();
online = true;
if (lastAckLocation != null) {
ackAllFromTo(null, lastAckLocation);
}
}
public void onCommand(Object command) {
try {
ReplicationFrame frame = (ReplicationFrame)command;
switch (frame.getHeader().getType()) {
case SLAVE_INIT:
onSlaveInit(frame, (PBSlaveInit)frame.getPayload());
break;
case SLAVE_ONLINE:
onSlaveOnline(frame);
break;
case FILE_TRANSFER:
onFileTransfer(frame, (PBFileInfo)frame.getPayload());
break;
case JOURNAL_UPDATE_ACK:
onJournalUpdateAck(frame, (PBJournalLocation)frame.getPayload());
break;
}
} catch (Exception e) {
LOG.warn("Slave request failed: " + e, e);
failed(e);
}
}
public void onException(IOException error) {
failed(error);
}
public void failed(Exception error) {
try {
stop();
} catch (Exception ignore) {
}
}
public void transportInterupted() {
}
public void transportResumed() {
}
private void deleteReplicationData() {
if (snapshotFile != null) {
snapshotFile.delete();
snapshotFile = null;
}
if (journalReplicatedFiles != null) {
journalReplicatedFiles = null;
updateJournalReplicatedFiles();
}
}
private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception {
// Start sending journal updates to the slave.
subscribedToJournalUpdates.set(true);
// We could look at the slave state sent in the slaveInit and decide
// that a full sync is not needed..
// but for now we will do a full sync every time.
ReplicationFrame rc = new ReplicationFrame();
final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
rc.setPayload(rcPayload);
// Setup a map of all the files that the slave has
final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, PBFileInfo>();
for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
slaveFiles.put(info.getName(), info);
}
final KahaDBStore store = replicationService.getStore();
store.checkpoint(new Callback() {
public void execute() throws Exception {
// This call back is executed once the checkpoint is
// completed and all data has been synced to disk,
// but while a lock is still held on the store so
// that no updates are done while we are in this
// method.
KahaDBStore store = replicationService.getStore();
if (lastAckLocation == null) {
lastAckLocation = store.getLastUpdatePosition();
}
int snapshotId = nextSnapshotId.incrementAndGet();
File file = store.getPageFile().getFile();
File dir = replicationService.getTempReplicationDir();
dir.mkdirs();
snapshotFile = new File(dir, "snapshot-" + snapshotId);
journalReplicatedFiles = new HashSet<Integer>();
// Store the list files associated with the snapshot.
ArrayList<PBFileInfo> snapshotInfos = new ArrayList<PBFileInfo>();
Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
for (DataFile df : journalFiles.values()) {
// Look at what the slave has so that only the missing
// bits are transfered.
String name = "journal-" + df.getDataFileId();
PBFileInfo slaveInfo = slaveFiles.remove(name);
// Use the checksum info to see if the slave has the
// file already.. Checksums are less acurrate for
// small amounts of data.. so ignore small files.
if (slaveInfo != null && slaveInfo.getEnd() > 1024 * 512) {
// If the slave's file checksum matches what we
// have..
if (ReplicationSupport.checksum(df.getFile(), 0, slaveInfo.getEnd()) == slaveInfo.getChecksum()) {
// is Our file longer? then we need to continue
// transferring the rest of the file.
if (df.getLength() > slaveInfo.getEnd()) {
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength()));
journalReplicatedFiles.add(df.getDataFileId());
continue;
} else {
// No need to replicate this file.
continue;
}
}
}
// If we got here then it means we need to transfer the
// whole file.
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
journalReplicatedFiles.add(df.getDataFileId());
}
PBFileInfo info = new PBFileInfo();
info.setName("database");
info.setSnapshotId(snapshotId);
info.setStart(0);
info.setEnd(file.length());
info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
snapshotInfos.add(info);
rcPayload.setCopyFilesList(snapshotInfos);
ArrayList<String> deleteFiles = new ArrayList<String>();
slaveFiles.remove("database");
for (PBFileInfo unused : slaveFiles.values()) {
deleteFiles.add(unused.getName());
}
rcPayload.setDeleteFilesList(deleteFiles);
updateJournalReplicatedFiles();
}
});
transport.oneway(rc);
}
private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
File file = replicationService.getReplicationFile(fileInfo.getName());
long payloadSize = fileInfo.getEnd() - fileInfo.getStart();
if (file.length() < fileInfo.getStart() + payloadSize) {
throw new IOException("Requested replication file dose not have enough data.");
}
ReplicationFrame rc = new ReplicationFrame();
rc.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
FileInputStream is = new FileInputStream(file);
rc.setPayload(is);
try {
is.skip(fileInfo.getStart());
transport.oneway(rc);
} finally {
try {
is.close();
} catch (Throwable e) {
}
}
}
}
/**
* Looks at all the journal files being currently replicated and informs the
* KahaDB so that it does not delete them while the replication is occuring.
*/
private void updateJournalReplicatedFiles() {
HashSet<Integer> files = replicationService.getStore().getJournalFilesBeingReplicated();
files.clear();
ArrayList<ReplicationSession> sessionsSnapshot;
synchronized (this.sessions) {
// Hurrah for copy on write..
sessionsSnapshot = this.sessions;
}
for (ReplicationSession session : sessionsSnapshot) {
if (session.journalReplicatedFiles != null) {
files.addAll(session.journalReplicatedFiles);
}
}
}
}

View File

@ -1,292 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
import org.apache.kahadb.store.KahaDBStore;
/**
* Handles interfacing with the ClusterStateManager and handles activating the
* slave or master facets of the broker.
*
* @author chirino
* @org.apache.xbean.XBean element="kahadbReplication"
*/
public class ReplicationService implements Service, ClusterListener {
private static final String JOURNAL_PREFIX = "journal-";
private static final Log LOG = LogFactory.getLog(ReplicationService.class);
private String brokerURI = "xbean:broker.xml";
private File directory = new File(IOHelper.getDefaultDataDirectory());
private File tempReplicationDir;
private String uri;
private ClusterStateManager cluster;
private int minimumReplicas=1;
private KahaDBStore store;
private ClusterState clusterState;
private BrokerService brokerService;
private ReplicationMaster master;
private ReplicationSlave slave;
public void start() throws Exception {
if( cluster==null ) {
throw new IllegalArgumentException("The cluster field has not been set.");
}
// The cluster will let us know about the cluster configuration,
// which lets us decide if we are going to be a slave or a master.
getStore().open();
cluster.addListener(this);
cluster.start();
cluster.addMember(getUri());
cluster.setMemberStatus(createStatus(State.SLAVE_UNCONNECTED));
}
public PBClusterNodeStatus createStatus(State state) throws IOException {
final PBClusterNodeStatus status = new PBClusterNodeStatus();
status.setConnectUri(getUri());
status.setLastUpdate(ReplicationSupport.convert(getStore().getLastUpdatePosition()));
status.setState(state);
return status;
}
public void stop() throws Exception {
cluster.removeListener(this);
cluster.stop();
stopMaster();
stopSlave();
getStore().close();
}
public void onClusterChange(ClusterState clusterState) {
this.clusterState = clusterState;
try {
synchronized (cluster) {
if (areWeTheSlave(clusterState)) {
// If we were the master we need to stop the master
// service..
stopMaster();
// If the slave service was not yet started.. start it up.
if( clusterState.getMaster()==null ) {
stopSlave();
} else {
startSlave();
slave.onClusterChange(clusterState);
}
} else if (areWeTheMaster(clusterState)) {
// If we were the slave we need to stop the slave service..
stopSlave();
// If the master service was not yet started.. start it up.
startMaster();
master.onClusterChange(clusterState);
} else {
// We were not part of the configuration (not master nor
// slave).
// So we have to shutdown any running master or slave
// services that may
// have been running.
stopMaster();
stopSlave();
getCluster().setMemberStatus(createStatus(State.SLAVE_UNCONNECTED));
}
}
} catch (Exception e) {
LOG.warn("Unexpected Error: " + e, e);
}
}
private void startMaster() throws IOException, Exception {
if (master == null) {
LOG.info("Starting replication master.");
getCluster().setMemberStatus(createStatus(State.MASTER));
brokerService = createBrokerService();
brokerService.start();
master = new ReplicationMaster(this);
master.start();
}
}
private void stopSlave() throws Exception {
if (slave != null) {
LOG.info("Stopping replication slave.");
slave.stop();
slave = null;
}
}
private void startSlave() throws Exception {
if (slave == null) {
LOG.info("Starting replication slave.");
slave = new ReplicationSlave(this);
slave.start();
}
}
private void stopMaster() throws Exception, IOException {
if (master != null) {
LOG.info("Stopping replication master.");
master.stop();
master = null;
brokerService.stop();
brokerService = null;
// Stopping the broker service actually stops the store
// too..
// so we need to open it back up.
getStore().open();
}
}
public BrokerService getBrokerService() {
return brokerService;
}
private BrokerService createBrokerService() throws Exception {
BrokerService rc = BrokerFactory.createBroker(brokerURI);
rc.setPersistenceAdapter(getStore());
return rc;
}
public ClusterState getClusterState() {
return clusterState;
}
private boolean areWeTheSlave(ClusterState config) {
return config.getSlaves().contains(uri);
}
private boolean areWeTheMaster(ClusterState config) {
return uri.equals(config.getMaster());
}
///////////////////////////////////////////////////////////////////
// Accessors
///////////////////////////////////////////////////////////////////
public File getReplicationFile(String fn) throws IOException {
if (fn.equals("database")) {
return getStore().getPageFile().getFile();
}
if (fn.startsWith(JOURNAL_PREFIX)) {
int id;
try {
id = Integer.parseInt(fn.substring(JOURNAL_PREFIX.length()));
} catch (NumberFormatException e) {
throw new IOException("Unknown replication file name: " + fn);
}
return getStore().getJournal().getFile(id);
} else {
throw new IOException("Unknown replication file name: " + fn);
}
}
public File getTempReplicationFile(String fn, int snapshotId) throws IOException {
if (fn.equals("database")) {
return new File(getTempReplicationDir(), "database-" + snapshotId);
}
if (fn.startsWith(JOURNAL_PREFIX)) {
int id;
try {
id = Integer.parseInt(fn.substring(JOURNAL_PREFIX.length()));
} catch (NumberFormatException e) {
throw new IOException("Unknown replication file name: " + fn);
}
return new File(getTempReplicationDir(), fn);
} else {
throw new IOException("Unknown replication file name: " + fn);
}
}
public boolean isMaster() {
return master != null;
}
public File getTempReplicationDir() {
if (tempReplicationDir == null) {
tempReplicationDir = new File(getStore().getDirectory(), "replication");
}
return tempReplicationDir;
}
public void setTempReplicationDir(File tempReplicationDir) {
this.tempReplicationDir = tempReplicationDir;
}
public KahaDBStore getStore() {
if (store == null) {
store = new KahaDBStore();
store.setDirectory(directory);
}
return store;
}
public void setStore(KahaDBStore store) {
this.store = store;
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public String getBrokerURI() {
return brokerURI;
}
public void setBrokerURI(String brokerURI) {
this.brokerURI = brokerURI;
}
public String getUri() {
return uri;
}
public void setUri(String nodeId) {
this.uri = nodeId;
}
public ClusterStateManager getCluster() {
return cluster;
}
public void setCluster(ClusterStateManager cluster) {
this.cluster = cluster;
}
public int getMinimumReplicas() {
return minimumReplicas;
}
public void setMinimumReplicas(int minimumReplicas) {
this.minimumReplicas = minimumReplicas;
}
}

View File

@ -1,588 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.replication.pb.PBFileInfo;
import org.apache.kahadb.replication.pb.PBHeader;
import org.apache.kahadb.replication.pb.PBJournalLocation;
import org.apache.kahadb.replication.pb.PBJournalUpdate;
import org.apache.kahadb.replication.pb.PBSlaveInit;
import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
import org.apache.kahadb.replication.pb.PBType;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
import org.apache.kahadb.store.KahaDBStore;
public class ReplicationSlave implements Service, ClusterListener, TransportListener {
private static final int MAX_TRANSFER_SESSIONS = 1;
private static final Log LOG = LogFactory.getLog(ReplicationSlave.class);
private final ReplicationService replicationServer;
private Transport transport;
// Used to bulk transfer the master state over to the slave..
private final Object transferMutex = new Object();
private final LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
private final LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
private final HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, PBFileInfo>();
private PBSlaveInitResponse initResponse;
private boolean online;
private final AtomicBoolean started = new AtomicBoolean();
// Used to do real time journal updates..
int journalUpdateFileId;
RandomAccessFile journalUpateFile;
private String master;
public ReplicationSlave(ReplicationService replicationServer) {
this.replicationServer = replicationServer;
}
public void start() throws Exception {
if( started.compareAndSet(false, true)) {
onClusterChange(replicationServer.getClusterState());
}
}
public void stop() throws Exception {
if( started.compareAndSet(true, false)) {
doStop();
}
}
private void doStart() throws Exception, URISyntaxException, IOException {
synchronized (transferMutex) {
// Failure recovery might be trying to start us back up,
// but the Replication server may have already stopped us so there is not need to start up.
if( !started.get() ) {
return;
}
replicationServer.getCluster().setMemberStatus(replicationServer.createStatus(State.SLAVE_SYNCRONIZING));
transport = TransportFactory.connect(new URI(master));
transport.setTransportListener(this);
transport.start();
// Make sure the replication directory exists.
replicationServer.getTempReplicationDir().mkdirs();
ReplicationFrame frame = new ReplicationFrame();
frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
PBSlaveInit payload = new PBSlaveInit();
payload.setNodeId(replicationServer.getUri());
// This call back is executed once the checkpoint is
// completed and all data has been
// synced to disk, but while a lock is still held on the
// store so that no
// updates are allowed.
HashMap<String, PBFileInfo> infosMap = new HashMap<String, PBFileInfo>();
// Add all the files that were being transfered..
File tempReplicationDir = replicationServer.getTempReplicationDir();
File[] list = tempReplicationDir.listFiles();
if( list!=null ) {
for (File file : list) {
String name = file.getName();
if( name.startsWith("database-") ) {
int snapshot;
try {
snapshot = Integer.parseInt(name.substring("database-".length()));
} catch (NumberFormatException e) {
continue;
}
PBFileInfo info = ReplicationSupport.createInfo("database", file, 0, file.length());
info.setSnapshotId(snapshot);
infosMap.put("database", info);
} else if( name.startsWith("journal-") ) {
PBFileInfo info = ReplicationSupport.createInfo(name, file, 0, file.length());
infosMap.put(name, info);
}
}
}
// Add all the db files that were not getting transfered..
KahaDBStore store = replicationServer.getStore();
Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
for (DataFile df : journalFiles.values()) {
String name = "journal-" + df.getDataFileId();
// Did we have a transfer in progress for that file already?
if( infosMap.containsKey(name) ) {
continue;
}
infosMap.put(name, ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
}
if( !infosMap.containsKey("database") ) {
File pageFile = store.getPageFile().getFile();
if( pageFile.exists() ) {
infosMap.put("database", ReplicationSupport.createInfo("database", pageFile, 0, pageFile.length()));
}
}
ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>(infosMap.size());
for (PBFileInfo info : infosMap.values()) {
infos.add(info);
}
payload.setCurrentFilesList(infos);
frame.setPayload(payload);
LOG.info("Sending master slave init command: " + payload);
online = false;
transport.oneway(frame);
}
}
private void doStop() throws Exception, IOException {
synchronized (transferMutex) {
if( this.transport!=null ) {
this.transport.stop();
this.transport=null;
}
// Stop any current transfer sessions.
for (TransferSession session : this.transferSessions) {
session.stop();
}
this.transferQueue.clear();
this.initResponse=null;
this.bulkFiles.clear();
this.online=false;
if( journalUpateFile !=null ) {
journalUpateFile.close();
journalUpateFile=null;
}
journalUpdateFileId=0;
}
}
public void onClusterChange(ClusterState config) {
synchronized (transferMutex) {
try {
if( master==null || !master.equals(config.getMaster()) ) {
master = config.getMaster();
doStop();
doStart();
}
} catch (Exception e) {
LOG.error("Could not restart syncing with new master: "+config.getMaster()+", due to: "+e,e);
}
}
}
public void onCommand(Object command) {
try {
ReplicationFrame frame = (ReplicationFrame) command;
switch (frame.getHeader().getType()) {
case SLAVE_INIT_RESPONSE:
onSlaveInitResponse(frame, (PBSlaveInitResponse) frame.getPayload());
break;
case JOURNAL_UPDATE:
onJournalUpdate(frame, (PBJournalUpdate) frame.getPayload());
}
} catch (Exception e) {
failed(e);
}
}
public void onException(IOException error) {
failed(error);
}
public void failed(Throwable error) {
try {
if( started.get() ) {
LOG.warn("Replication session fail to master: "+transport.getRemoteAddress(), error);
doStop();
// Wait a little an try to establish the session again..
Thread.sleep(1000);
doStart();
}
} catch (Exception ignore) {
}
}
public void transportInterupted() {
}
public void transportResumed() {
}
private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate update) throws IOException {
// Send an ack back once we get the ack.. yeah it's a little dirty to ack before it's on disk,
// but chances are low that both machines are going to loose power at the same time and this way,
// we reduce the latency the master sees from us.
if( update.getSendAck() ) {
ReplicationFrame ack = new ReplicationFrame();
ack.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE_ACK));
ack.setPayload(update.getLocation());
transport.oneway(ack);
}
// TODO: actually do the disk write in an async thread so that this thread can be
// start reading in the next journal updated.
boolean onlineRecovery=false;
PBJournalLocation location = update.getLocation();
byte[] data = update.getData().toByteArray();
synchronized (transferMutex) {
if( journalUpateFile==null || journalUpdateFileId!=location.getFileId() ) {
if( journalUpateFile!=null) {
journalUpateFile.close();
}
File file;
String name = "journal-"+location.getFileId();
if( !online ) {
file = replicationServer.getTempReplicationFile(name, 0);
if( !bulkFiles.containsKey(name) ) {
bulkFiles.put(name, new PBFileInfo().setName(name));
}
} else {
// Once the data has been synced.. we are going to
// go into an online recovery mode...
file = replicationServer.getReplicationFile(name);
}
journalUpateFile = new RandomAccessFile(file, "rw");
journalUpdateFileId = location.getFileId();
}
// System.out.println("Writing: "+location.getFileId()+":"+location.getOffset()+" with "+data.length);
journalUpateFile.seek(location.getOffset());
journalUpateFile.write(data);
if( online ) {
onlineRecovery=true;
}
}
if( onlineRecovery ) {
KahaDBStore store = replicationServer.getStore();
// Let the journal know that we appended to one of it's files..
store.getJournal().appendedExternally(ReplicationSupport.convert(location), data.length);
// Now incrementally recover those records.
store.incrementalRecover();
}
}
private void commitBulkTransfer() {
try {
synchronized (transferMutex) {
LOG.info("Slave synhcronization complete, going online...");
replicationServer.getStore().close();
if( journalUpateFile!=null ) {
journalUpateFile.close();
journalUpateFile=null;
}
// If we got a new snapshot of the database, then we need to
// delete it's assisting files too.
if( bulkFiles.containsKey("database") ) {
PageFile pageFile = replicationServer.getStore().getPageFile();
pageFile.getRecoveryFile().delete();
pageFile.getFreeFile().delete();
}
for (PBFileInfo info : bulkFiles.values()) {
File from = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
File to = replicationServer.getReplicationFile(info.getName());
to.getParentFile().mkdirs();
move(from, to);
}
delete(initResponse.getDeleteFilesList());
online=true;
replicationServer.getStore().open();
replicationServer.getCluster().setMemberStatus(replicationServer.createStatus(State.SLAVE_ONLINE));
LOG.info("Slave is now online. We are now eligible to become the master.");
}
// Let the master know we are now online.
ReplicationFrame frame = new ReplicationFrame();
frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
transport.oneway(frame);
} catch (Throwable e) {
e.printStackTrace();
failed(e);
}
}
private void onSlaveInitResponse(ReplicationFrame frame, PBSlaveInitResponse response) throws Exception {
LOG.info("Got init response: " + response);
initResponse = response;
synchronized (transferMutex) {
bulkFiles.clear();
List<PBFileInfo> infos = response.getCopyFilesList();
for (PBFileInfo info : infos) {
bulkFiles.put(info.getName(), info);
File target = replicationServer.getReplicationFile(info.getName());
// are we just appending to an existing file journal file?
if( info.getName().startsWith("journal-") && info.getStart() > 0 && target.exists() ) {
// Then copy across the first bits..
File tempFile = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
FileInputStream is = new FileInputStream(target);
FileOutputStream os = new FileOutputStream(tempFile);
try {
copy(is, os, info.getStart());
} finally {
try { is.close(); } catch (Throwable e){}
try { os.close(); } catch (Throwable e){}
}
}
}
transferQueue.clear();
transferQueue.addAll(infos);
}
addTransferSession();
}
private PBFileInfo dequeueTransferQueue() throws Exception {
synchronized (transferMutex) {
if (transferQueue.isEmpty()) {
return null;
}
return transferQueue.removeFirst();
}
}
private void addTransferSession() {
synchronized (transferMutex) {
while (transport!=null && !transferQueue.isEmpty() && transferSessions.size() < MAX_TRANSFER_SESSIONS) {
TransferSession transferSession = new TransferSession();
transferSessions.add(transferSession);
try {
transferSession.start();
} catch (Exception e) {
transferSessions.remove(transferSession);
}
}
// Once we are done processing all the transfers..
if (transferQueue.isEmpty() && transferSessions.isEmpty()) {
commitBulkTransfer();
}
}
}
private void move(File from, File to) throws IOException {
// If a simple rename/mv does not work..
to.delete();
if (!from.renameTo(to)) {
// Copy and Delete.
FileInputStream is = null;
FileOutputStream os = null;
try {
is = new FileInputStream(from);
os = new FileOutputStream(to);
os.getChannel().transferFrom(is.getChannel(), 0, is.getChannel().size());
} finally {
try {
is.close();
} catch(Throwable e) {
}
try {
os.close();
} catch(Throwable e) {
}
}
from.delete();
}
}
class TransferSession implements Service, TransportListener {
Transport transport;
private PBFileInfo info;
private File toFile;
private AtomicBoolean stopped = new AtomicBoolean();
private long transferStart;
public void start() throws Exception {
LOG.info("File transfer session started.");
transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
transport.setTransportListener(this);
transport.start();
sendNextRequestOrStop();
}
private void sendNextRequestOrStop() {
try {
PBFileInfo info = dequeueTransferQueue();
if (info != null) {
toFile = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
this.info = info;
ReplicationFrame frame = new ReplicationFrame();
frame.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER));
frame.setPayload(info);
LOG.info("Requesting file: " + info.getName());
transferStart = System.currentTimeMillis();
transport.oneway(frame);
} else {
stop();
}
} catch (Exception e) {
failed(e);
}
}
public void stop() throws Exception {
if (stopped.compareAndSet(false, true)) {
LOG.info("File transfer session stopped.");
synchronized (transferMutex) {
if (info != null) {
transferQueue.addLast(info);
}
info = null;
}
transport.stop();
synchronized (transferMutex) {
transferSessions.remove(TransferSession.this);
addTransferSession();
}
}
}
public void onCommand(Object command) {
try {
ReplicationFrame frame = (ReplicationFrame) command;
InputStream is = (InputStream) frame.getPayload();
toFile.getParentFile().mkdirs();
RandomAccessFile os = new RandomAccessFile(toFile, "rw");
os.seek(info.getStart());
try {
copy(is, os, frame.getHeader().getPayloadSize());
long transferTime = System.currentTimeMillis() - this.transferStart;
float rate = frame.getHeader().getPayloadSize() * transferTime / 1024000f;
LOG.info("File " + info.getName() + " transfered in " + transferTime + " (ms) at " + rate + " Kb/Sec");
} finally {
os.close();
}
this.info = null;
this.toFile = null;
sendNextRequestOrStop();
} catch (Exception e) {
failed(e);
}
}
public void onException(IOException error) {
failed(error);
}
public void failed(Exception error) {
try {
if (!stopped.get()) {
LOG.warn("Replication session failure: " + transport.getRemoteAddress());
}
stop();
} catch (Exception ignore) {
}
}
public void transportInterupted() {
}
public void transportResumed() {
}
}
private void copy(InputStream is, OutputStream os, long length) throws IOException {
byte buffer[] = new byte[1024 * 4];
int c = 0;
long pos = 0;
while (pos < length && ((c = is.read(buffer, 0, (int) Math.min(buffer.length, length - pos))) >= 0)) {
os.write(buffer, 0, c);
pos += c;
}
}
private void copy(InputStream is, DataOutput os, long length) throws IOException {
byte buffer[] = new byte[1024 * 4];
int c = 0;
long pos = 0;
while (pos < length && ((c = is.read(buffer, 0, (int) Math.min(buffer.length, length - pos))) >= 0)) {
os.write(buffer, 0, c);
pos += c;
}
}
private void delete(List<String> files) {
for (String fn : files) {
try {
replicationServer.getReplicationFile(fn).delete();
} catch (IOException e) {
}
}
}
}

View File

@ -1,109 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.replication.pb.PBFileInfo;
import org.apache.kahadb.replication.pb.PBJournalLocation;
public class ReplicationSupport {
static public PBJournalLocation convert(Location loc) {
if( loc==null ) {
return null;
}
return new PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
}
static public Location convert(PBJournalLocation location) {
Location rc = new Location();
rc.setDataFileId(location.getFileId());
rc.setOffset(location.getOffset());
return rc;
}
static public long copyAndChecksum(File input, File output) throws IOException {
FileInputStream is = null;
FileOutputStream os = null;
try {
is = new FileInputStream(input);
os = new FileOutputStream(output);
byte buffer[] = new byte[1024 * 4];
int c;
Checksum checksum = new Adler32();
while ((c = is.read(buffer)) >= 0) {
os.write(buffer, 0, c);
checksum.update(buffer, 0, c);
}
return checksum.getValue();
} finally {
try {
is.close();
} catch(Throwable e) {
}
try {
os.close();
} catch(Throwable e) {
}
}
}
public static PBFileInfo createInfo(String name, File file, long start, long length) throws IOException {
PBFileInfo rc = new PBFileInfo();
rc.setName(name);
rc.setChecksum(checksum(file, start, length));
rc.setStart(start);
rc.setEnd(length);
return rc;
}
public static long checksum(File file, long start, long end) throws IOException {
RandomAccessFile raf = new RandomAccessFile(file, "r");
try {
Checksum checksum = new Adler32();
byte buffer[] = new byte[1024 * 4];
int c;
long pos = start;
raf.seek(start);
while (pos < end && (c = raf.read(buffer, 0, (int) Math.min(end - pos, buffer.length))) >= 0) {
checksum.update(buffer, 0, c);
pos += c;
}
return checksum.getValue();
} finally {
try {
raf.close();
} catch (Throwable e) {
}
}
}
}

View File

@ -1,217 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication.blaze;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeblaze.cluster.BlazeClusterGroupChannel;
import org.apache.activeblaze.cluster.BlazeClusterGroupChannelFactory;
import org.apache.activeblaze.cluster.BlazeClusterGroupConfiguration;
import org.apache.activeblaze.cluster.MasterChangedListener;
import org.apache.activeblaze.group.Member;
import org.apache.activeblaze.group.MemberChangedListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.replication.ClusterListener;
import org.apache.kahadb.replication.ClusterState;
import org.apache.kahadb.replication.ClusterStateManager;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
/**
*
* @author rajdavies
* @org.apache.xbean.XBean element="blazeCluster"
*/
public class BlazeClusterStateManager extends BlazeClusterGroupConfiguration implements ClusterStateManager,
MemberChangedListener, MasterChangedListener {
private static final Log LOG = LogFactory.getLog(BlazeClusterStateManager.class);
private BlazeClusterGroupChannel channel;
private AtomicBoolean started = new AtomicBoolean();
private ClusterState clusterState;
private String localMemberName;
private PBClusterNodeStatus status;
final private List<ClusterListener> listeners = new CopyOnWriteArrayList<ClusterListener>();
/**
* @param listener
* @see org.apache.kahadb.replication.ClusterStateManager#addListener(org.apache.kahadb.replication.ClusterListener)
*/
public void addListener(ClusterListener listener) {
this.listeners.add(listener);
initializeChannel();
fireClusterChange();
}
/**
* @param node
* @see org.apache.kahadb.replication.ClusterStateManager#addMember(java.lang.String)
*/
public void addMember(String node) {
this.localMemberName = node;
initializeChannel();
}
/**
* @param listener
* @see org.apache.kahadb.replication.ClusterStateManager#removeListener(org.apache.kahadb.replication.ClusterListener)
*/
public void removeListener(ClusterListener listener) {
this.listeners.remove(listener);
}
/**
* @param node
* @see org.apache.kahadb.replication.ClusterStateManager#removeMember(java.lang.String)
*/
public void removeMember(String node) {
}
/**
* @param status
* @see org.apache.kahadb.replication.ClusterStateManager#setMemberStatus(org.apache.kahadb.replication.pb.PBClusterNodeStatus)
*/
public void setMemberStatus(PBClusterNodeStatus status) {
if (status != null) {
this.status = status;
setMasterWeight(status.getState().getNumber());
if (this.channel != null) {
this.channel.getConfiguration().setMasterWeight(getMasterWeight());
try {
this.channel.waitForElection(getAwaitGroupTimeout());
} catch (Exception e) {
LOG.error("Wait for Election Failed");
}
}
processClusterStateChange();
}
}
/**
* @param arg0
* @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
*/
public void memberStarted(Member arg0) {
processClusterStateChange();
}
/**
* @param arg0
* @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
*/
public void memberStopped(Member arg0) {
processClusterStateChange();
}
/**
* @param arg0
* @see org.apache.activeblaze.cluster.MasterChangedListener#masterChanged(org.apache.activeblaze.group.Member)
*/
public void masterChanged(Member arg0) {
processClusterStateChange();
}
/**
* @throws Exception
* @see org.apache.activemq.Service#start()
*/
public void start() throws Exception {
if (this.started.compareAndSet(false, true)) {
initializeChannel();
}
this.started.set(true);
}
/**
* @throws Exception
* @see org.apache.activemq.Service#stop()
*/
public void stop() throws Exception {
if (this.started.compareAndSet(true, false)) {
if (this.channel != null) {
this.channel.removeMemberChangedListener(this);
this.channel.removeMasterChangedListener(this);
this.channel.shutDown();
this.channel = null;
}
}
this.started.set(false);
}
private boolean isStarted() {
return this.started.get();
}
synchronized private void updateClusterState(ClusterState clusterState) {
this.clusterState = clusterState;
fireClusterChange();
}
private void fireClusterChange() {
if (isStarted() && !this.listeners.isEmpty() && this.clusterState != null) {
for (ClusterListener listener : this.listeners) {
listener.onClusterChange(this.clusterState);
}
}
}
private void processClusterStateChange() {
if (isStarted()) {
try {
ClusterState state = new ClusterState();
this.channel.waitForElection(getAwaitGroupTimeout());
Set<Member> members = this.channel.getMembers();
Member master = this.channel.getMaster();
if (master != null) {
// check we can be the master
if (!this.channel.isMaster() || (this.status != null)) {
state.setMaster(master.getName());
members.remove(master);
}
}
List<String> slaves = new ArrayList<String>();
for (Member slave : members) {
slaves.add(slave.getName());
}
state.setSlaves(slaves);
updateClusterState(state);
} catch (Exception e) {
LOG.error("Failed to process Cluster State Changed", e);
}
}
}
private synchronized void initializeChannel() {
if (this.localMemberName != null && this.channel == null) {
try {
BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory(this);
this.channel = factory.createChannel(this.localMemberName);
this.channel.addMemberChangedListener(this);
this.channel.addMasterChangedListener(this);
if (isStarted()) {
this.channel.start();
this.channel.waitForElection(getAwaitGroupTimeout());
}
processClusterStateChange();
} catch (Exception e) {
LOG.error("Failed to create channel", e);
}
}
}
}

View File

@ -1,57 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication.transport;
import java.util.Map;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
*
* @version $Revision$
*/
public class KDBRTransportFactory extends TcpTransportFactory {
protected String getDefaultWireFormatType() {
return "kdbr";
}
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
protected boolean isUseInactivityMonitor(Transport transport) {
return false;
}
/**
* Override to remove the correlation transport filter since that relies on Command to
* multiplex multiple requests and this protocol does not support that.
*/
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
transport = compositeConfigure(transport, wf, options);
transport = new MutexTransport(transport);
return transport;
}
}

View File

@ -1,125 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication.transport;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.activemq.protobuf.InvalidProtocolBufferException;
import org.apache.activemq.protobuf.Message;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.replication.ReplicationFrame;
import org.apache.kahadb.replication.pb.PBFileInfo;
import org.apache.kahadb.replication.pb.PBHeader;
import org.apache.kahadb.replication.pb.PBJournalLocation;
import org.apache.kahadb.replication.pb.PBJournalUpdate;
import org.apache.kahadb.replication.pb.PBSlaveInit;
import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
public class KDBRWireFormat implements WireFormat {
private int version;
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public ByteSequence marshal(Object command) throws IOException {
throw new RuntimeException("Not implemented.");
}
public Object unmarshal(ByteSequence packet) throws IOException {
throw new RuntimeException("Not implemented.");
}
public void marshal(Object command, DataOutput out) throws IOException {
OutputStream os = (OutputStream) out;
ReplicationFrame frame = (ReplicationFrame) command;
PBHeader header = frame.getHeader();
switch (frame.getHeader().getType()) {
case FILE_TRANSFER_RESPONSE: {
// Write the header..
header.writeFramed(os);
// Stream the Payload.
InputStream is = (InputStream) frame.getPayload();
byte data[] = new byte[1024 * 4];
int c;
long remaining = frame.getHeader().getPayloadSize();
while (remaining > 0 && (c = is.read(data, 0, (int) Math.min(remaining, data.length))) >= 0) {
os.write(data, 0, c);
remaining -= c;
}
break;
}
default:
if (frame.getPayload() == null) {
header.clearPayloadSize();
header.writeFramed(os);
} else {
// All other payloads types are PB messages
Message message = (Message) frame.getPayload();
header.setPayloadSize(message.serializedSizeUnframed());
header.writeFramed(os);
message.writeUnframed(os);
}
}
}
public Object unmarshal(DataInput in) throws IOException {
InputStream is = (InputStream) in;
ReplicationFrame frame = new ReplicationFrame();
frame.setHeader(PBHeader.parseFramed(is));
switch (frame.getHeader().getType()) {
case FILE_TRANSFER_RESPONSE:
frame.setPayload(is);
break;
case FILE_TRANSFER:
readPBPayload(frame, in, new PBFileInfo());
break;
case JOURNAL_UPDATE:
readPBPayload(frame, in, new PBJournalUpdate());
break;
case JOURNAL_UPDATE_ACK:
readPBPayload(frame, in, new PBJournalLocation());
break;
case SLAVE_INIT:
readPBPayload(frame, in, new PBSlaveInit());
break;
case SLAVE_INIT_RESPONSE:
readPBPayload(frame, in, new PBSlaveInitResponse());
break;
}
return frame;
}
private void readPBPayload(ReplicationFrame frame, DataInput in, Message pb) throws IOException, InvalidProtocolBufferException {
long payloadSize = frame.getHeader().getPayloadSize();
byte[] payload;
payload = new byte[(int)payloadSize];
in.readFully(payload);
frame.setPayload(pb.mergeUnframed(payload));
}
}

View File

@ -1,30 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication.transport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/**
* @version $Revision$
*/
public class KDBRWireFormatFactory implements WireFormatFactory {
public WireFormat createWireFormat() {
return new KDBRWireFormat();
}
}

View File

@ -1,520 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication.zk;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.replication.ClusterListener;
import org.apache.kahadb.replication.ClusterState;
import org.apache.kahadb.replication.ClusterStateManager;
import org.apache.kahadb.replication.ReplicationSupport;
import org.apache.kahadb.replication.pb.PBClusterConfiguration;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
*
* @author chirino
* @org.apache.xbean.XBean element="zookeeperCluster"
*/
public class ZooKeeperClusterStateManager implements ClusterStateManager, Watcher {
private static final Log LOG = LogFactory.getLog(ZooKeeperClusterStateManager.class);
final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
private int startCounter;
private String uri = "zk://localhost:2181/activemq/ha-cluster/default";
String userid = "activemq";
String password = "";
private ZooKeeper zk;
private String path;
private ClusterState clusterState;
private String statusPath;
private PBClusterNodeStatus memberStatus;
private Thread takoverTask;
private boolean areWeTheBestMaster;
synchronized public void addListener(ClusterListener listener) {
listeners.add(listener);
fireClusterChange();
}
synchronized public void removeListener(ClusterListener listener) {
listeners.remove(listener);
}
synchronized private void updateClusterState(ClusterState clusterState) {
this.clusterState = clusterState;
fireClusterChange();
}
synchronized private void fireClusterChange() {
if (startCounter > 0 && !listeners.isEmpty()) {
for (ClusterListener listener : listeners) {
listener.onClusterChange(clusterState);
}
}
}
synchronized public void start() throws Exception {
startCounter++;
if (startCounter == 1) {
// Make sure the path is set..
String path = getPath();
// Create a ZooKeeper connection..
zk = createZooKeeperConnection();
while( isStarted() ) {
try {
mkParentDirs(path);
try {
zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (NodeExistsException ignore) {
}
processClusterStateChange();
return;
} catch (Exception e) {
handleZKError(e);
}
}
}
}
synchronized private boolean isStarted() {
return startCounter > 0;
}
synchronized public void stop() throws Exception {
startCounter--;
if (startCounter == 0) {
zk.close();
zk = null;
path=null;
clusterState=null;
statusPath=null;
memberStatus=null;
takoverTask=null;
areWeTheBestMaster=false;
}
}
public String getPath() {
if( path == null ) {
try {
URI uri = new URI(this.uri);
path = uri.getPath();
if (path == null) {
throw new IllegalArgumentException("Invalid uri '" + uri + "', path to cluster configuration not specified");
}
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid uri '" + uri + "': "+e);
}
}
return path;
}
ZooKeeper createZooKeeperConnection() throws URISyntaxException, IOException {
// Parse out the configuration URI.
URI uri = new URI(this.uri);
if (!uri.getScheme().equals("zk")) {
throw new IllegalArgumentException("Invalid uri '" + uri + "', expected it to start with zk://");
}
String host = uri.getHost();
if (host == null) {
throw new IllegalArgumentException("Invalid uri '" + uri + "', host not specified");
}
int port = uri.getPort();
if (port == -1) {
port = 2181;
}
ZooKeeper zk = new ZooKeeper(host, port, this);
zk.addAuthInfo("digest", (userid+":"+password).getBytes());
return zk;
}
private void processClusterStateChange() {
try {
if( zk==null ) {
return;
}
byte[] data = zk.getData(path, new Watcher() {
public void process(WatchedEvent event) {
processClusterStateChange();
}
}, new Stat());
PBClusterConfiguration config = new PBClusterConfiguration();
config.mergeUnframed(data);
ClusterState state = new ClusterState();
HashSet<String> slaves = new HashSet<String>(config.getMembersList());
if( config.hasMaster() ) {
state.setMaster(config.getMaster());
slaves.remove(config.getMaster());
}
state.setSlaves(new ArrayList<String>(slaves));
updateClusterState(state);
} catch (Exception e) {
e.printStackTrace();
}
}
public void process(WatchedEvent event) {
System.out.println("Got: " + event);
}
public void setMemberStatus(final PBClusterNodeStatus status) {
while( isStarted() ) {
try {
this.memberStatus = status;
if (statusPath == null) {
mkdirs(path + "/election");
statusPath = zk.create(path + "/election/n_", status.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} else {
Stat stat = zk.exists(statusPath, false);
if (status == null) {
zk.delete(statusPath, stat.getVersion());
statusPath = null;
} else {
zk.setData(statusPath, status.toUnframedByteArray(), stat.getVersion());
}
}
processElectionChange();
return;
} catch (Exception e) {
e.printStackTrace();
handleZKError(e);
}
}
}
synchronized private void processElectionChange() {
try {
if( zk==null ) {
return;
}
List<String> zkNodes = zk.getChildren(path + "/election", new Watcher() {
public void process(WatchedEvent event) {
processElectionChange();
}
});
Map<String, PBClusterNodeStatus> children = processNodeStatus(zkNodes);
if( children.isEmpty() ) {
return;
}
String firstNodeId = children.keySet().iterator().next();
// If we are the first child?
if( firstNodeId.equals(statusPath) ) {
// If we are master already no need to do anything else
if ( memberStatus.getConnectUri().equals(clusterState.getMaster()) ) {
return;
}
// We may need to wait till a little to figure out if we are
// actually the best pick to be the master.
switch (memberStatus.getState()) {
case MASTER:
case SLAVE_ONLINE:
// Can transition to master immediately
LOG.info("Online salve taking over as master.");
setMaster(memberStatus.getConnectUri());
return;
case SLAVE_SYNCRONIZING:
case SLAVE_UNCONNECTED:
// If it looks like we are the best master.. lets wait 5 secs to
// let other slaves
// join the cluster and get a chance to take over..
if (areWeTheBestMaster(children)) {
areWeTheBestMaster = true;
if( takoverTask==null ) {
LOG.info(memberStatus.getConnectUri()+" looks like the best offline slave that can take over as master.. waiting 5 secs to allow another slave to take over.");
takoverTask = new Thread("Slave takeover..") {
public void run() {
takoverAttempt();
}
};
takoverTask.setDaemon(true);
takoverTask.start();
}
return;
} else {
if( areWeTheBestMaster ) {
LOG.info(memberStatus.getConnectUri()+" no longer looks like the best offline slave that can take over as master.");
}
areWeTheBestMaster = false;
// If we get here we need to yield our top position in the node
// sequence list so that the better
// slave can become the master.
Stat stat = zk.exists(statusPath, false);
zk.delete(statusPath, stat.getVersion());
statusPath = zk.create(path + "/election/n_", memberStatus.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
protected void takoverAttempt() {
try {
for( int i=0; i < 10; i++ ) {
Thread.sleep(500);
if( !isStarted() )
return;
}
synchronized(this) {
try {
if( areWeTheBestMaster ) {
LOG.info(memberStatus.getConnectUri()+" is taking over as master.");
setMaster(memberStatus.getConnectUri());
}
} finally {
// We want to make sure we set takoverTask to null in the same mutex as we set the master.
takoverTask=null;
}
}
} catch (Exception e) {
} finally {
// sleep might error out..
synchronized(this) {
takoverTask=null;
}
}
}
private boolean areWeTheBestMaster(Map<String, PBClusterNodeStatus> children) {
Location ourLocation = ReplicationSupport.convert(memberStatus.getLastUpdate());
for (Entry<String, PBClusterNodeStatus> entry : children.entrySet()) {
PBClusterNodeStatus status = entry.getValue();
switch (status.getState()) {
case MASTER:
case SLAVE_ONLINE:
return false;
case SLAVE_SYNCRONIZING:
case SLAVE_UNCONNECTED:
if (ourLocation.compareTo(ReplicationSupport.convert(status.getLastUpdate())) < 0) {
return false;
}
}
}
return true;
}
private Map<String, PBClusterNodeStatus> processNodeStatus(List<String> children) throws KeeperException, InterruptedException, InvalidProtocolBufferException {
java.util.TreeMap<String, PBClusterNodeStatus> rc = new java.util.TreeMap<String, PBClusterNodeStatus>();
for (String nodeId : children) {
try {
Stat stat = new Stat();
byte[] data = zk.getData(path + "/election/" + nodeId, false, stat);
PBClusterNodeStatus status = new PBClusterNodeStatus();
status.mergeUnframed(data);
rc.put(path + "/election/" + nodeId, status);
} catch (NoNodeException ignore) {
}
}
return rc;
}
public void addMember(final String node) {
while( isStarted() ) {
try {
mkParentDirs(path);
update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
public byte[] update(byte[] data) throws InvalidProtocolBufferException {
PBClusterConfiguration config = new PBClusterConfiguration();
if (data != null) {
config.mergeUnframed(data);
}
if (!config.getMembersList().contains(node)) {
config.addMembers(node);
}
return config.toFramedByteArray();
}
});
return;
} catch (Exception e) {
handleZKError(e);
}
}
}
public void removeMember(final String node) {
while( isStarted() ) {
try {
mkParentDirs(path);
update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
public byte[] update(byte[] data) throws InvalidProtocolBufferException {
PBClusterConfiguration config = new PBClusterConfiguration();
if (data != null) {
config.mergeUnframed(data);
}
config.getMembersList().remove(node);
return config.toFramedByteArray();
}
});
return;
} catch (Exception e) {
handleZKError(e);
}
}
}
public void setMaster(final String node) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
mkParentDirs(path);
update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
public byte[] update(byte[] data) throws InvalidProtocolBufferException {
PBClusterConfiguration config = new PBClusterConfiguration();
if (data != null) {
config.mergeUnframed(data);
}
config.setMaster(node);
return config.toFramedByteArray();
}
});
}
interface Updater<T extends Throwable> {
byte[] update(byte[] data) throws T;
}
private <T extends Throwable> void update(String path, CreateMode persistent, Updater<T> updater) throws InvalidProtocolBufferException, KeeperException, InterruptedException, T {
Stat stat = zk.exists(path, false);
if (stat != null) {
byte[] data = zk.getData(path, false, stat);
data = updater.update(data);
zk.setData(path, data, stat.getVersion());
} else {
byte[] update = updater.update(null);
try {
zk.create(path, update, Ids.OPEN_ACL_UNSAFE, persistent);
} catch (NodeExistsException ignore) {
stat = zk.exists(path, false);
byte[] data = zk.getData(path, false, stat);
data = updater.update(data);
zk.setData(path, data, stat.getVersion());
}
}
}
private void mkParentDirs(String path) throws KeeperException, InterruptedException {
int lastIndexOf = path.lastIndexOf("/");
if (lastIndexOf >= 0) {
mkdirs(path.substring(0, lastIndexOf));
}
}
private void mkdirs(String path) throws KeeperException, InterruptedException {
if (zk.exists(path, false) != null) {
return;
}
// Remove the leading /
if (path.startsWith("/")) {
path = path.substring(1);
}
String[] split = path.split("/");
String cur = "";
for (String node : split) {
cur += "/" + node;
try {
zk.create(cur, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (NodeExistsException ignore) {
}
}
}
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
public String getUserid() {
return userid;
}
public void setUserid(String userid) {
this.userid = userid;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
private void handleZKError(Exception e) {
LOG.warn("ZooKeeper error. Will retry operation in 1 seconds");
LOG.debug("The error was: "+e, e);
for( int i=0; i < 10; i ++) {
try {
if( !isStarted() )
return;
Thread.sleep(100);
} catch (InterruptedException e1) {
return;
}
}
}
}

View File

@ -1,147 +0,0 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.apache.kahadb.replication.pb;
option java_multiple_files = true;
option java_outer_classname = "PB";
//
//
//
message PBHeader {
required PBType type=1;
optional int64 payload_size=2;
}
enum PBType {
// Sent from the slave to the master when the slave first starts. It lets the master
// know about the slave's synchronization state. This allows the master decide how to best synchronize
// the slave.
//
// @followed-by PBSlaveInit
SLAVE_INIT = 0;
// The Master will send this response back to the slave, letting it know what it needs to do to get
// it's data files synchronized with the master.
//
// @followed-by PBSlaveInitResponse
SLAVE_INIT_RESPONSE = 1;
// The Slave will send this this command to the master once he has completed
// all his bulk synchronizations and he is ready to take over as being a master.
//
// @followed-by null
SLAVE_ONLINE=2;
// Sent from the Master to the slave to replicate a Journal update.
//
// @followed-by PBJournalUpdate
JOURNAL_UPDATE=3;
// An ack sent back to the master in response to to a received
// JOURNAL_UPDATE
//
// @followed-by PBJournalLocation
JOURNAL_UPDATE_ACK=4;
// A Request for a bulk file transfer. Sent from a slave to a Master
//
// @followed-by PBFileInfo
FILE_TRANSFER=5;
// A bulk file transfer response
//
// @followed-by the bytes of the requested file.
FILE_TRANSFER_RESPONSE=6;
}
message PBFileInfo {
required string name=1;
optional int32 snapshot_id=2;
optional sfixed64 checksum=3;
optional int64 start=4;
optional int64 end=5;
}
message PBJournalLocation {
required int32 file_id=1;
required int32 offset=2;
}
message PBSlaveInit {
// The id of the slave node that is being initialized
required string node_id=1;
// The files that the slave node currently has
repeated PBFileInfo current_files=2;
}
message PBSlaveInitResponse {
// The files that the slave should bulk copy from the master..
repeated PBFileInfo copy_files=1;
// The files that the slave should delete
repeated string delete_files=2;
}
message PBJournalUpdate {
// Journal location of the update.
required PBJournalLocation location=1;
// The data that will be written at that location.
required bytes data=2;
// Should the slave send back an ack for this update.
optional bool send_ack=3;
// If true, then the slave should do a disk sync before returning a
// JOURNAL_UPDATE_ACK
optional bool disk_sync=4;
}
//
// This hold
//
message PBClusterConfiguration {
// Would be nice if the configuration of the broker was setup cluster wide. We could
// stuff the spring config in here.. That way pushing out changes to the rest of the
// cluster would be very easy.
optional bytes broker_configuration=1;
// Who are the nodes that have joined the cluster. They may not all be online.
// Comes in handy to see if there are enough online members to form a quorum.
repeated string members=2;
// Who was the last elected master.
optional string master=3;
}
message PBClusterNodeStatus {
enum State {
// When the slave initially starts up it
// is not connected to a master.
SLAVE_UNCONNECTED = 0;
// When the slave first attaches to a master, it must first
// synchronize with the master to get any data updates
// that were missed while he was offline.
SLAVE_SYNCRONIZING = 1;
// The slave is caught up and is only actively applying
// real time updates from the master.
SLAVE_ONLINE = 3;
// This node is the master.
MASTER = 4;
}
required State state=1;
optional string connect_uri=2;
optional PBJournalLocation last_update=3;
}

View File

@ -1,150 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import java.io.File;
import java.util.Arrays;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class ReplicationTest extends TestCase {
private static final String BROKER1_URI = "tcp://localhost:61001";
private static final String BROKER2_URI = "tcp://localhost:61002";
private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
private Destination destination = new ActiveMQQueue("TEST_QUEUE");
public void testReplication() throws Exception {
// This cluster object will control who becomes the master.
StaticClusterStateManager cluster = new StaticClusterStateManager();
ReplicationService rs1 = new ReplicationService();
rs1.setMinimumReplicas(0);
rs1.setUri(BROKER1_REPLICATION_ID);
rs1.setCluster(cluster);
rs1.setDirectory(new File("target/replication-test/broker1"));
rs1.setBrokerURI("broker://("+BROKER1_URI+")/broker1");
rs1.start();
ReplicationService rs2 = new ReplicationService();
rs2.setMinimumReplicas(0);
rs2.setUri(BROKER2_REPLICATION_ID);
rs2.setCluster(cluster);
rs2.setDirectory(new File("target/replication-test/broker2"));
rs2.setBrokerURI("broker://(" + BROKER2_URI + ")/broker2");
rs2.start();
// // None of the brokers should be accepting connections since they are not masters.
// try {
// sendMesagesTo(1, BROKER1_URI);
// fail("Connection failure expected.");
// } catch( JMSException e ) {
// }
// Make b1 the master.
ClusterState clusterState = new ClusterState();
clusterState.setMaster(BROKER1_REPLICATION_ID);
cluster.setClusterState(clusterState);
try {
sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
} catch( JMSException e ) {
fail("b1 did not become a master.");
}
// Make broker 2 a salve.
clusterState = new ClusterState();
clusterState.setMaster(BROKER1_REPLICATION_ID);
String[] slaves = {BROKER2_REPLICATION_ID};
clusterState.setSlaves(Arrays.asList(slaves));
cluster.setClusterState(clusterState);
Thread.sleep(1000);
try {
sendMesagesTo(BROKER1_URI, 100, "Pass 2: ");
} catch( JMSException e ) {
fail("Failed to send more messages...");
}
Thread.sleep(2000);
// Make broker 2 the master.
clusterState = new ClusterState();
clusterState.setMaster(BROKER2_REPLICATION_ID);
cluster.setClusterState(clusterState);
Thread.sleep(1000);
assertReceived(200, BROKER2_URI);
rs2.stop();
rs1.stop();
}
private void assertReceived(int count, String brokerUri) throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
Connection con = cf.createConnection();
con.start();
try {
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < count; i++) {
TextMessage m = (TextMessage) consumer.receive(1000);
if( m==null ) {
fail("Failed to receive message: "+i);
}
System.out.println("Got: "+m.getText());
}
} finally {
try { con.close(); } catch (Throwable e) {}
}
}
private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
Connection con = cf.createConnection();
try {
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < count; i++) {
producer.send(session.createTextMessage(msg+i));
}
} finally {
try { con.close(); } catch (Throwable e) {}
}
}
}

View File

@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import java.util.ArrayList;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
public class StaticClusterStateManager implements ClusterStateManager {
final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
private ClusterState clusterState;
private int startCounter;
synchronized public ClusterState getClusterState() {
return clusterState;
}
synchronized public void setClusterState(ClusterState clusterState) {
this.clusterState = clusterState;
fireClusterChange();
}
synchronized public void addListener(ClusterListener listener) {
listeners.add(listener);
fireClusterChange();
}
synchronized public void removeListener(ClusterListener listener) {
listeners.remove(listener);
}
synchronized public void start() throws Exception {
startCounter++;
fireClusterChange();
}
synchronized private void fireClusterChange() {
if( startCounter>0 && !listeners.isEmpty() && clusterState!=null ) {
for (ClusterListener listener : listeners) {
listener.onClusterChange(clusterState);
}
}
}
synchronized public void stop() throws Exception {
startCounter--;
}
public void addMember(String node) {
}
public void removeMember(String node) {
}
public void setMemberStatus(PBClusterNodeStatus status) {
}
}

View File

@ -1,151 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication;
import java.io.File;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ServerStats;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.apache.zookeeper.server.persistence.FileTxnLog;
public class XBeanReplicationTest extends TestCase {
private static final String BROKER1_URI = "tcp://localhost:61616";
private static final String BROKER2_URI = "tcp://localhost:61617";
private Destination destination = new ActiveMQQueue("TEST_QUEUE");
private static final int PORT = 2181;
private Factory serverFactory;
public void testReplication() throws Exception {
startZooKeeper();
BrokerService broker1 = BrokerFactory.createBroker("xbean:broker1/ha.xml");
broker1.start();
// Wait for the broker to get setup..
Thread.sleep(7000);
sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
// Create broker 2 which will join in and sync up with the existing master.
BrokerService broker2 = BrokerFactory.createBroker("xbean:broker2/ha.xml");
broker2.start();
// Give it some time to sync up..
Thread.sleep(1000);
// Stopping broker1 should make broker2 the master.
broker1.stop();
Thread.sleep(1000);
// Did all the messages get synced up?
assertReceived(100, BROKER2_URI);
// Send some more message...
sendMesagesTo(BROKER2_URI, 50, "Pass 2: ");
// Start broker1 up again.. it should re-sync with master 2
broker1.start();
// Give it some time to sync up..
Thread.sleep(1000);
// stopping the master..
broker2.stop();
// Did the new state get synced right?
assertReceived(50, BROKER1_URI);
broker1.stop();
stopZooKeeper();
}
private void stopZooKeeper() {
serverFactory.shutdown();
ServerStats.unregister();
}
private void startZooKeeper() throws IOException, InterruptedException {
ServerStats.registerAsConcrete();
File zooKeeperData = new File("target/test-data/zookeeper-"+System.currentTimeMillis());
zooKeeperData.mkdirs();
// Reduces startup time..
System.setProperty("zookeeper.preAllocSize", "100");
FileTxnLog.setPreallocSize(100);
ZooKeeperServer zs = new ZooKeeperServer(zooKeeperData, zooKeeperData, 3000);
serverFactory = new NIOServerCnxn.Factory(PORT);
serverFactory.startup(zs);
}
private void assertReceived(int count, String brokerUri) throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
Connection con = cf.createConnection();
con.start();
try {
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < count; i++) {
TextMessage m = (TextMessage) consumer.receive(1000);
if( m==null ) {
fail("Failed to receive message: "+i);
}
System.out.println("Got: "+m.getText());
}
} finally {
try { con.close(); } catch (Throwable e) {}
}
}
private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
Connection con = cf.createConnection();
try {
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < count; i++) {
producer.send(session.createTextMessage(msg+i));
}
} finally {
try { con.close(); } catch (Throwable e) {}
}
}
}

View File

@ -1,137 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication.blaze;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.activemq.util.Callback;
import org.apache.kahadb.replication.ClusterListener;
import org.apache.kahadb.replication.ClusterState;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
import org.apache.kahadb.replication.pb.PBJournalLocation;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
/**
* @author rajdavies
*
*/
public class BlazeClusterStateManagerTest extends TestCase {
public void testTwoNodesGoingOnline() throws Exception {
final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
final LinkedBlockingQueue<ClusterState> stateEvents2 = new LinkedBlockingQueue<ClusterState>();
final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
bcsm1.addListener(new ClusterListener() {
public void onClusterChange(ClusterState config) {
stateEvents1.add(config);
}
});
bcsm1.start();
bcsm1.addMember("kdbr://localhost:60001");
final BlazeClusterStateManager bcsm2 = new BlazeClusterStateManager();
bcsm2.addListener(new ClusterListener() {
public void onClusterChange(ClusterState config) {
stateEvents2.add(config);
}
});
bcsm2.start();
bcsm2.addMember("kdbr://localhost:60002");
// Drain the events..
while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
}
while (stateEvents2.poll(100, TimeUnit.MILLISECONDS) != null) {
}
// Bring node 1 online
final PBClusterNodeStatus status1 = new PBClusterNodeStatus();
status1.setConnectUri("kdbr://localhost:60001");
status1.setLastUpdate(new PBJournalLocation().setFileId(1).setOffset(50));
status1.setState(State.SLAVE_UNCONNECTED);
executeAsync(new Callback() {
public void execute() throws Exception {
bcsm1.setMemberStatus(status1);
}
});
// Bring node 2 online
final PBClusterNodeStatus status2 = new PBClusterNodeStatus();
status2.setConnectUri("kdbr://localhost:60002");
status2.setLastUpdate(new PBJournalLocation().setFileId(2).setOffset(20));
status2.setState(State.SLAVE_UNCONNECTED);
executeAsync(new Callback() {
public void execute() throws Exception {
Thread.sleep(1000);
bcsm2.setMemberStatus(status2);
}
});
ClusterState state = stateEvents1.poll(10, TimeUnit.SECONDS);
assertNotNull(state);
assertNotNull(state.getMaster());
assertEquals("kdbr://localhost:60002", state.getMaster());
assertTrue(state.getSlaves().size() == 1);
state = stateEvents2.poll(2, TimeUnit.SECONDS);
assertNotNull(state);
assertNotNull(state.getMaster());
assertEquals("kdbr://localhost:60002", state.getMaster());
assertTrue(state.getSlaves().size() == 1);
bcsm2.stop();
bcsm1.stop();
}
public void testOneNodeGoingOnline() throws Exception {
final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
bcsm1.addListener(new ClusterListener() {
public void onClusterChange(ClusterState config) {
stateEvents1.add(config);
}
});
bcsm1.start();
// Drain the events..
while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
}
// Let node1 join the cluster.
bcsm1.addMember("kdbr://localhost:60001");
ClusterState state = stateEvents1.poll(1, TimeUnit.SECONDS);
assertNotNull(state);
assertNull(state.getMaster());
assertTrue(state.getSlaves().size() == 1);
// Let the cluster know that node1 is online..
PBClusterNodeStatus status = new PBClusterNodeStatus();
status.setConnectUri("kdbr://localhost:60001");
status.setLastUpdate(new PBJournalLocation().setFileId(0).setOffset(0));
status.setState(State.SLAVE_UNCONNECTED);
bcsm1.setMemberStatus(status);
state = stateEvents1.poll(10, TimeUnit.SECONDS);
assertNotNull(state);
assertNotNull(state.getMaster());
assertEquals("kdbr://localhost:60001", state.getMaster());
assertTrue(state.getSlaves().isEmpty());
bcsm1.stop();
}
private void executeAsync(final Callback callback) {
new Thread("Async Test Task") {
@Override
public void run() {
try {
callback.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
}

View File

@ -1,225 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication.transport;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
import org.apache.kahadb.replication.ReplicationFrame;
import org.apache.kahadb.replication.pb.PBHeader;
import org.apache.kahadb.replication.pb.PBJournalLocation;
import org.apache.kahadb.replication.pb.PBSlaveInit;
import org.apache.kahadb.replication.pb.PBType;
public class KDBRTransportTest extends TestCase {
private static final String KDBR_URI = "kdbr://localhost:61618";
private List<Object> serverQueue;
private List<Object> clientQueue;
private List<Transport> serverTransports;
private TransportServer server;
private Transport client;
private Object commandLatchMutex = new Object();
private CountDownLatch commandLatch;
protected void releaseCommandLatch() {
synchronized( commandLatchMutex ) {
if( commandLatch == null ) {
return;
}
commandLatch.countDown();
commandLatch=null;
}
}
protected CountDownLatch getCommandLatch() {
synchronized( commandLatchMutex ) {
if( commandLatch == null ) {
commandLatch = new CountDownLatch(1);
}
return commandLatch;
}
}
@Override
protected void setUp() throws Exception {
serverQueue = Collections.synchronizedList(new ArrayList<Object>());
clientQueue = Collections.synchronizedList(new ArrayList<Object>());
serverTransports = Collections.synchronizedList(new ArrayList<Transport>());
// Setup a server
server = TransportFactory.bind(new URI(KDBR_URI));
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(Transport transport) {
try {
transport.setTransportListener(new TransportListener() {
public void onCommand(Object command) {
try {
serverQueue.add(command);
process(command);
releaseCommandLatch();
} catch (IOException e) {
onException(e);
}
}
public void onException(IOException error) {
serverQueue.add(error);
serverTransports.remove(this);
releaseCommandLatch();
}
public void transportInterupted() {
}
public void transportResumed() {
}
});
transport.start();
serverTransports.add(transport);
} catch (Exception e) {
onAcceptError(e);
}
}
public void onAcceptError(Exception error) {
error.printStackTrace();
}
});
server.start();
// Connect a client.
client = TransportFactory.connect(new URI(KDBR_URI));
client.setTransportListener(new TransportListener() {
public void onCommand(Object command) {
clientQueue.add(command);
releaseCommandLatch();
}
public void onException(IOException error) {
clientQueue.add(error);
releaseCommandLatch();
}
public void transportInterupted() {
}
public void transportResumed() {
}
});
client.start();
}
@Override
protected void tearDown() throws Exception {
client.stop();
server.stop();
}
private void process(Object command) throws IOException {
ReplicationFrame frame = (ReplicationFrame) command;
// Since we are processing the commands async in this test case we need to full read the stream before
// returning since will be be used to read the next command once we return.
if( frame.getHeader().getType() == PBType.FILE_TRANSFER_RESPONSE ) {
InputStream ais = (InputStream) frame.getPayload();
byte actualPayload[] = new byte[(int)frame.getHeader().getPayloadSize()];
readFully(ais, actualPayload);
frame.setPayload(actualPayload);
}
}
/**
* Test a frame that has a streaming payload.
*
* @throws Exception
*/
public void testFileTransferResponse() throws Exception {
byte expectedPayload[] = {1,2,3,4,5,6,7,8,9,10};
ReplicationFrame expected = new ReplicationFrame();
expected.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(expectedPayload.length));
ByteArrayInputStream is = new ByteArrayInputStream(expectedPayload);
expected.setPayload(is);
CountDownLatch latch = getCommandLatch();
client.oneway(expected);
is.close();
latch.await(2, TimeUnit.SECONDS);
assertEquals(1, serverQueue.size());
ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
assertEquals(expected.getHeader(), actual.getHeader());
assertTrue(Arrays.equals(expectedPayload, (byte[])actual.getPayload()));
}
/**
* Test out sending a frame that has a PB payload.
*
* @throws Exception
*/
public void testPBSlaveInitFrame() throws Exception {
ReplicationFrame expected = new ReplicationFrame();
expected.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
expected.setPayload(new PBSlaveInit().setNodeId("foo"));
CountDownLatch latch = getCommandLatch();
client.oneway(expected);
latch.await(2, TimeUnit.SECONDS);
assertEquals(1, serverQueue.size());
ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
assertEquals(expected.getHeader(), actual.getHeader());
assertEquals(expected.getPayload(), actual.getPayload());
}
private void readFully(InputStream ais, byte[] actualPayload) throws IOException {
int pos = 0;
int c;
while( pos < actualPayload.length && (c=ais.read(actualPayload, pos, actualPayload.length-pos))>=0 ) {
pos += c;
}
if( pos < actualPayload.length ) {
throw new EOFException();
}
}
}

View File

@ -1,212 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.replication.zk;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.activemq.util.Callback;
import org.apache.kahadb.replication.ClusterListener;
import org.apache.kahadb.replication.ClusterState;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
import org.apache.kahadb.replication.pb.PBJournalLocation;
import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ServerStats;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.apache.zookeeper.server.persistence.FileTxnLog;
public class ZooKeeperClusterStateManagerTest extends TestCase {
private static final int PORT = 2181;
private ZooKeeperClusterStateManager zkcsm1;
private ZooKeeper zk;
private Factory serverFactory;
@Override
protected void setUp() throws Exception {
ServerStats.registerAsConcrete();
File tmpDir = new File("target/test-data/zookeeper");
tmpDir.mkdirs();
// Reduces startup time..
System.setProperty("zookeeper.preAllocSize", "100");
FileTxnLog.setPreallocSize(100);
ZooKeeperServer zs = new ZooKeeperServer(tmpDir, tmpDir, 3000);
serverFactory = new NIOServerCnxn.Factory(PORT);
serverFactory.startup(zs);
zkcsm1 = new ZooKeeperClusterStateManager();
zk = zkcsm1.createZooKeeperConnection();
// Cleanup after previous run...
zkRecusiveDelete(zkcsm1.getPath());
}
private void zkRecusiveDelete(String path) throws KeeperException, InterruptedException {
Stat stat = zk.exists(path, false);
if( stat!=null ) {
if( stat.getNumChildren() > 0 ) {
List<String> children = zk.getChildren(path, false);
for (String node : children) {
zkRecusiveDelete(path+"/"+node);
}
}
zk.delete(path, stat.getVersion());
}
}
@Override
protected void tearDown() throws Exception {
zk.close();
serverFactory.shutdown();
ServerStats.unregister();
}
public void testTwoNodesGoingOnline() throws Exception {
final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
final LinkedBlockingQueue<ClusterState> stateEvents2 = new LinkedBlockingQueue<ClusterState>();
zkcsm1.addListener(new ClusterListener() {
public void onClusterChange(ClusterState config) {
stateEvents1.add(config);
}
});
zkcsm1.start();
zkcsm1.addMember("kdbr://localhost:60001");
final ZooKeeperClusterStateManager zkcsm2 = new ZooKeeperClusterStateManager();
zkcsm2.addListener(new ClusterListener() {
public void onClusterChange(ClusterState config) {
stateEvents2.add(config);
}
});
zkcsm2.start();
zkcsm2.addMember("kdbr://localhost:60002");
// Drain the events..
while( stateEvents1.poll(100, TimeUnit.MILLISECONDS)!=null ) {
}
while( stateEvents2.poll(100, TimeUnit.MILLISECONDS)!=null ) {
}
// Bring node 1 online
final PBClusterNodeStatus status1 = new PBClusterNodeStatus();
status1.setConnectUri("kdbr://localhost:60001");
status1.setLastUpdate(new PBJournalLocation().setFileId(1).setOffset(50));
status1.setState(State.SLAVE_UNCONNECTED);
executeAsync(new Callback() {
public void execute() throws Exception {
zkcsm1.setMemberStatus(status1);
}
});
// Bring node 2 online
final PBClusterNodeStatus status2 = new PBClusterNodeStatus();
status2.setConnectUri("kdbr://localhost:60002");
status2.setLastUpdate(new PBJournalLocation().setFileId(2).setOffset(20));
status2.setState(State.SLAVE_UNCONNECTED);
executeAsync(new Callback() {
public void execute() throws Exception {
Thread.sleep(1000);
zkcsm2.setMemberStatus(status2);
}
});
ClusterState state = stateEvents1.poll(10, TimeUnit.SECONDS);
assertNotNull(state);
assertNotNull(state.getMaster());
assertEquals("kdbr://localhost:60002", state.getMaster());
assertTrue(state.getSlaves().size()==1);
state = stateEvents2.poll(1, TimeUnit.SECONDS);
assertNotNull(state);
assertNotNull(state.getMaster());
assertEquals("kdbr://localhost:60002", state.getMaster());
assertTrue(state.getSlaves().size()==1);
zkcsm2.stop();
zkcsm1.stop();
}
private void executeAsync(final Callback callback) {
new Thread("Async Test Task") {
@Override
public void run() {
try {
callback.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
public void testOneNodeGoingOnline() throws Exception {
final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
zkcsm1.addListener(new ClusterListener() {
public void onClusterChange(ClusterState config) {
stateEvents1.add(config);
}
});
zkcsm1.start();
// Drain the events..
while( stateEvents1.poll(100, TimeUnit.MILLISECONDS)!=null ) {
}
// Let node1 join the cluster.
zkcsm1.addMember("kdbr://localhost:60001");
ClusterState state = stateEvents1.poll(1, TimeUnit.SECONDS);
assertNotNull(state);
assertNull(state.getMaster());
assertTrue(state.getSlaves().size()==1);
// Let the cluster know that node1 is online..
PBClusterNodeStatus status = new PBClusterNodeStatus();
status.setConnectUri("kdbr://localhost:60001");
status.setLastUpdate(new PBJournalLocation().setFileId(0).setOffset(0));
status.setState(State.SLAVE_UNCONNECTED);
zkcsm1.setMemberStatus(status);
state = stateEvents1.poll(10, TimeUnit.SECONDS);
assertNotNull(state);
assertNotNull(state.getMaster());
assertEquals("kdbr://localhost:60001", state.getMaster());
assertTrue(state.getSlaves().isEmpty());
zkcsm1.stop();
}
}

View File

@ -1,90 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.store.perf;
import java.io.File;
import java.util.Arrays;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.perf.SimpleQueueTest;
import org.apache.kahadb.replication.ClusterState;
import org.apache.kahadb.replication.ReplicationService;
import org.apache.kahadb.replication.StaticClusterStateManager;
/**
* @version $Revision: 712224 $
*/
public class ReplicatedKahaStoreQueueTest extends SimpleQueueTest {
private StaticClusterStateManager cluster;
private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
protected String broker2BindAddress="tcp://localhost:61617";
private ReplicationService rs1;
private ReplicationService rs2;
@Override
protected BrokerService createBroker(String uri) throws Exception {
clientURI="failover:(" +
"tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&wireFormat.maxInactivityDuration=50000" +
"," +
"tcp://localhost:61617?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&wireFormat.maxInactivityDuration=50000" +
")?jms.useAsyncSend=true";
// This cluster object will control who becomes the master.
cluster = new StaticClusterStateManager();
ClusterState clusterState = new ClusterState();
clusterState.setMaster(BROKER1_REPLICATION_ID);
String[] slaves = {BROKER2_REPLICATION_ID};
clusterState.setSlaves(Arrays.asList(slaves));
cluster.setClusterState(clusterState);
rs1 = new ReplicationService();
rs1.setUri(BROKER1_REPLICATION_ID);
rs1.setCluster(cluster);
rs1.setDirectory(new File("target/replication-test/broker1"));
rs1.setBrokerURI("broker://("+uri+")/broker1");
rs1.start();
rs2 = new ReplicationService();
rs2.setUri(BROKER2_REPLICATION_ID);
rs2.setCluster(cluster);
rs2.setDirectory(new File("target/replication-test/broker2"));
rs2.setBrokerURI("broker://(" + broker2BindAddress + ")/broker2");
rs2.start();
return rs1.getBrokerService();
}
@Override
protected void tearDown() throws Exception {
if( rs1!=null ) {
rs1.stop();
rs1 = null;
}
if( rs2!=null ) {
rs2.stop();
rs2 = null;
}
}
}

View File

@ -1,36 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" brokerName="localhost" dataDirectory="target/data" useJmx="false">
<!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>
<!-- END SNIPPET: example -->

View File

@ -1,47 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:kdb="http://activemq.apache.org/schema/kahadb"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activ emq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/kahadb http://activemq.apache.org/schema/kahadb/kahadb.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
<replicationService>
<kahadbReplication
directory="target/kaha-data/broker1"
brokerURI="xbean:broker1/ha-broker.xml"
uri="kdbr://localhost:6001"
minimumReplicas="0">
<cluster>
<zookeeperCluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>
</cluster>
</kahadbReplication>
</replicationService>
</kahadbReplicationBroker>
</beans>
<!-- END SNIPPET: example -->

View File

@ -1,36 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" brokerName="localhost" dataDirectory="target/data" useJmx="false">
<!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61617"/>
</transportConnectors>
</broker>
</beans>
<!-- END SNIPPET: example -->

View File

@ -1,47 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:kdb="http://activemq.apache.org/schema/kahadb"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activ emq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/kahadb http://activemq.apache.org/schema/kahadb/kahadb.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
<replicationService>
<kahadbReplication
directory="target/kaha-data-broker2"
brokerURI="xbean:broker2/ha-broker.xml"
uri="kdbr://localhost:6002"
minimumReplicas="0">
<cluster>
<zookeeperCluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>
</cluster>
</kahadbReplication>
</replicationService>
</kahadbReplicationBroker>
</beans>
<!-- END SNIPPET: example -->