From 10a4154a0635cb98316fd60664c24ac503661ddc Mon Sep 17 00:00:00 2001
From: Jean-Daniel Cryans
Date: Fri, 5 Feb 2010 17:37:54 +0000
Subject: [PATCH] HBASE-2129 Simple Master/Slave replication
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@907011 13f79535-47bb-0310-9956-ffa450edef68
---
CHANGES.txt | 1 +
src/contrib/mdc_replication/bin/add_peer.rb | 63 ++++
.../mdc_replication/bin/copy_tables_desc.rb | 61 ++++
src/contrib/mdc_replication/build.xml | 52 ++++
src/contrib/mdc_replication/ivy.xml | 107 +++++++
.../hadoop/hbase/ipc/ReplicationRPC.java | 42 +++
.../hbase/ipc/ReplicationRegionInterface.java | 40 +++
.../java/org/apache/hadoop/hbase/package.html | 136 +++++++++
.../replication/ReplicationRegion.java | 103 +++++++
.../replication/ReplicationRegionServer.java | 170 +++++++++++
.../replication/ReplicationSink.java | 283 ++++++++++++++++++
.../replication/ReplicationSource.java | 163 ++++++++++
.../wal/replication/ReplicationHLog.java | 83 +++++
.../ReplicationConnectionManager.java | 97 ++++++
.../ReplicationZookeeperHelper.java | 169 +++++++++++
.../replication/TestReplicationSink.java | 219 ++++++++++++++
.../hbase/replication/TestReplication.java | 268 +++++++++++++++++
17 files changed, 2057 insertions(+)
create mode 100644 src/contrib/mdc_replication/bin/add_peer.rb
create mode 100644 src/contrib/mdc_replication/bin/copy_tables_desc.rb
create mode 100644 src/contrib/mdc_replication/build.xml
create mode 100644 src/contrib/mdc_replication/ivy.xml
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java
create mode 100644 src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java
create mode 100644 src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java
create mode 100644 src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
diff --git a/CHANGES.txt b/CHANGES.txt
index 9141e820415..0dad48cc473 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -362,6 +362,7 @@ Release 0.21.0 - Unreleased
hfiles direct) uploader
HBASE-1433 Update hbase build to match core, use ivy, publish jars to maven
repo, etc. (Kay Kay via Stack)
+ HBASE-2129 Simple Master/Slave replication
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite
diff --git a/src/contrib/mdc_replication/bin/add_peer.rb b/src/contrib/mdc_replication/bin/add_peer.rb
new file mode 100644
index 00000000000..6d08d5337c6
--- /dev/null
+++ b/src/contrib/mdc_replication/bin/add_peer.rb
@@ -0,0 +1,63 @@
+# Script to add a peer to a cluster
+# To see usage for this script, run:
+#
+# ${HBASE_HOME}/bin/hbase org.jruby.Main add_peer.rb
+#
+
+include Java
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.EmptyWatcher
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
+
+# Name of this script
+NAME = "add_peer"
+
+# Print usage for this script
+def usage
+ puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
+ exit!
+end
+
+if ARGV.size != 2
+ usage
+end
+
+LOG = LogFactory.getLog(NAME)
+
+parts1 = ARGV[0].split(":")
+LOG.info("Master cluster located at " + parts1[0] + " port " + parts1[1] + " in folder " + parts1[2])
+
+c2 = HBaseConfiguration.create()
+parts2 = ARGV[1].split(":")
+LOG.info("Slave cluster located at " + parts2[0] + " port " + parts2[1] + " in folder " + parts2[2])
+
+LOG.info("The addresses must be exactly the same as those in hbase-site.xml of each cluster.")
+print "Are those info correct? [Y/n] "
+answer = $stdin.gets.chomp
+
+if answer.length != 0 || answer == "n" || answer == "no"
+ exit!
+end
+
+c1 = HBaseConfiguration.create()
+c1.set(HConstants.ZOOKEEPER_QUORUM, parts1[0])
+c1.set("hbase.zookeeper.property.clientPort", parts1[1])
+c1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts1[2])
+
+zkw1 = ZooKeeperWrapper.new(c1, EmptyWatcher.instance)
+zkw1.writeZNode(parts1[2], "replication", "a")
+zkw1.writeZNode(parts1[2] + "/replication", "master", ARGV[0]);
+zkw1.writeZNode(parts1[2] + "/replication", "state", "true");
+zkw1.writeZNode(parts1[2] + "/replication/peers", "test", ARGV[1]);
+
+
+c2.set(HConstants.ZOOKEEPER_QUORUM, parts2[0])
+c2.set("hbase.zookeeper.property.clientPort", parts2[1])
+c2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts2[2])
+
+zkw2 = ZooKeeperWrapper.new(c2, EmptyWatcher.instance)
+zkw2.writeZNode(parts2[2], "replication", "a")
+zkw2.writeZNode(parts2[2] + "/replication", "master", ARGV[0]);
diff --git a/src/contrib/mdc_replication/bin/copy_tables_desc.rb b/src/contrib/mdc_replication/bin/copy_tables_desc.rb
new file mode 100644
index 00000000000..bbe1a9b5c57
--- /dev/null
+++ b/src/contrib/mdc_replication/bin/copy_tables_desc.rb
@@ -0,0 +1,61 @@
+# Script to recreate all tables from one cluster to another
+# To see usage for this script, run:
+#
+# ${HBASE_HOME}/bin/hbase org.jruby.Main copy_tables_desc.rb
+#
+
+include Java
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.EmptyWatcher
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.HTableDescriptor
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
+
+# Name of this script
+NAME = "copy_tables_desc"
+
+# Print usage for this script
+def usage
+ puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
+ exit!
+end
+
+if ARGV.size != 2
+ usage
+end
+
+LOG = LogFactory.getLog(NAME)
+
+parts1 = ARGV[0].split(":")
+LOG.info("Master cluster located at " + parts1[0] + " port " + parts1[1] + " in folder " + parts1[2])
+
+parts2 = ARGV[1].split(":")
+LOG.info("Slave cluster located at " + parts2[0] + " port " + parts2[1] + " in folder " + parts2[2])
+
+print "Are those info correct? [Y/n] "
+answer = $stdin.gets.chomp
+
+if answer.length != 0 || answer == "n" || answer == "no"
+ exit!
+end
+
+c1 = HBaseConfiguration.create()
+c1.set(HConstants.ZOOKEEPER_QUORUM, parts1[0])
+c1.set("hbase.zookeeper.property.clientPort", parts1[1])
+c1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts1[2])
+
+admin1 = HBaseAdmin.new(c1)
+
+c2 = HBaseConfiguration.create()
+c2.set(HConstants.ZOOKEEPER_QUORUM, parts2[0])
+c2.set("hbase.zookeeper.property.clientPort", parts2[1])
+c2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts2[2])
+
+admin2 = HBaseAdmin.new(c2)
+
+for t in admin1.listTables()
+ admin2.createTable(t)
+end
diff --git a/src/contrib/mdc_replication/build.xml b/src/contrib/mdc_replication/build.xml
new file mode 100644
index 00000000000..a9a1aafa165
--- /dev/null
+++ b/src/contrib/mdc_replication/build.xml
@@ -0,0 +1,52 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/contrib/mdc_replication/ivy.xml b/src/contrib/mdc_replication/ivy.xml
new file mode 100644
index 00000000000..7d85aa7d201
--- /dev/null
+++ b/src/contrib/mdc_replication/ivy.xml
@@ -0,0 +1,107 @@
+
+
+
+
+
+
+
+ Hadoop Core
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java
new file mode 100644
index 00000000000..57ab93c9e35
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+/**
+ * Helper class to add RPC-related configs for replication
+ */
+public class ReplicationRPC {
+
+ private static final byte RPC_CODE = 110;
+
+ private static boolean initialized = false;
+
+ public synchronized static void initialize() {
+ if (initialized) {
+ return;
+ }
+ HBaseRPC.addToMap(ReplicationRegionInterface.class, RPC_CODE);
+ initialized = true;
+ }
+
+ private ReplicationRPC() {
+ // Static helper class;
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java
new file mode 100644
index 00000000000..5f65baaa5e8
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+import java.io.IOException;
+
+/**
+ * Interface that defines replication
+ */
+public interface ReplicationRegionInterface extends HRegionInterface {
+
+ /**
+ * Replicates the given entries. The guarantee is that the given entries
+ * will be durable on the slave cluster if this method returns without
+ * and exception.
+ * @param entries entries to replicate
+ * @throws IOException
+ */
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
+
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html
new file mode 100644
index 00000000000..6efc85d3ec6
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html
@@ -0,0 +1,136 @@
+
+
+
+
+
+
+
+Multi Data Center Replication
+This package provides replication between HBase clusters.
+
+
+
Table Of Contents
+
+ - Status
+ - Requirements
+ - Deployment
+
+
+
+
+Status
+
+
+This package isn't even alpha quality software and is only meant to be a base
+for future developments. The current implementation offers the following
+features:
+
+
+ - Master/Slave replication limited to 1 slave.
+ - Replication of all user tables.
+ - Start/stop replication stream.
+ - Supports cluters of different sizes.
+ - Re-replication of entries from failed region
+ servers on the master cluster.
+
+Please report bugs on the project's Jira when found.
+
+
+Requirements
+
+
+
+Before trying out replication, make sure to review the following requirements:
+
+
+ - Zookeeper should be handled by yourself, not by HBase, and should
+ always be available during the deployment. Currently you can't use
+ zoo.cfg to hold your Zookeeper configurations.
+ - All machines from both clusters should be able to reach every
+ other machine since replication goes from any region server to any
+ other one on the slave cluster. That also includes the
+ Zookeeper clusters.
+ - Both clusters should have the same HBase and Hadoop major revision.
+ For example, having 0.21.1 on the master and 0.21.0 on the slave is
+ correct but not 0.21.1 and 0.22.0.
+ - Every table should exist with the exact same name and column
+ family names on both clusters.
+
+
+
+
+Deployment
+
+
+
+The following steps describe how to enable replication from a cluster
+to another. This must be done with both clusters offlined.
+
+ - Copy the hbase-0.21.0-dev-mdc_replication.jar file from the
+ $HBASE_HOME/contrib/mdc_replication/ folder to $HBASE_HOME/lib on
+ both clusters.
+ - Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
+ the following configurations:
+
+<property>
+ <name>hbase.regionserver.class</name>
+ <value>org.apache.hadoop.hbase.ipc.ReplicationRegionInterface</value>
+</property>
+<property>
+ <name>hbase.regionserver.impl</name>
+ <value>org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer</value>
+</property>
+
+ - Run the following command on any cluster:
+
+$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/src/contrib/mdc_replication/bin/add_peer.tb
+ This will show you the help to setup the replication stream between
+ both clusters. If both clusters use the same Zookeeper cluster, you have
+ to use a different zookeeper.znode.parent since they can't
+ write in the same folder.
+
+ - You can now start and stop the clusters with your preferred method.
+
+
+You can confirm that your setup works by looking at any region server's log
+on the master cluster and look for the following lines;
+
+
+Considering 1 rs, with ratio 0.1
+Getting 1 rs from peer cluster # 0
+Choosing peer 10.10.1.49:62020
+
+In this case it indicates that 1 region server from the slave cluster
+was chosen for replication.
+
+Should you want to stop the replication while the clusters are running, open
+the shell on the master cluster and issue this command:
+
+hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'
+
+Where you replace the znode parent with the one configured on your master
+cluster. Replication of already queued edits will still happen after you
+issued that command but new entries won't be. To start it back, simply replace
+"false" with "true" in the command.
+
+
+
+
+
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java
new file mode 100644
index 00000000000..3cdb8a6bc74
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+/**
+ * Specialized version of HRegion to handle replication. In particular,
+ * it replays all edits from the reconstruction log.
+ */
+public class ReplicationRegion extends HRegion {
+
+ static final Log LOG = LogFactory.getLog(ReplicationRegion.class);
+
+ private final ReplicationSource replicationSource;
+
+ public ReplicationRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
+ HRegionInfo regionInfo, FlushRequester flushListener,
+ ReplicationSource repSource) {
+ super(basedir, log, fs, conf, regionInfo, flushListener);
+ this.replicationSource = repSource;
+ }
+
+
+ protected void doReconstructionLog(final Path oldLogFile,
+ final long minSeqId, final long maxSeqId, final Progressable reporter)
+ throws UnsupportedEncodingException, IOException {
+ super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
+
+ if(this.replicationSource == null) {
+ return;
+ }
+
+ if (oldLogFile == null || !getFilesystem().exists(oldLogFile)) {
+ return;
+ }
+
+ FileStatus[] stats = getFilesystem().listStatus(oldLogFile);
+ if (stats == null || stats.length == 0) {
+ LOG.warn("Passed reconstruction log " + oldLogFile
+ + " is zero-length");
+ }
+
+ HLog.Reader reader = HLog.getReader(getFilesystem(), oldLogFile, getConf());
+ try {
+ HLog.Entry entry;
+ while ((entry = reader.next()) != null) {
+ HLogKey key = entry.getKey();
+ KeyValue val = entry.getEdit();
+ if (key.getLogSeqNum() < maxSeqId) {
+ continue;
+ }
+
+ // Don't replicate catalog entries and meta information like
+ // complete log flush.
+ if(!(Bytes.equals(key.getTablename(),ROOT_TABLE_NAME) ||
+ Bytes.equals(key.getTablename(),META_TABLE_NAME)) &&
+ !Bytes.equals(val.getFamily(), HLog.METAFAMILY)) {
+ this.replicationSource.enqueueLog(entry);
+ }
+
+ }
+ } finally {
+ reader.close();
+ }
+
+
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java
new file mode 100644
index 00000000000..93fdecf5bc4
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.ReplicationRPC;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.replication.ReplicationHLog;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ReplicationRegionServer extends HRegionServer
+ implements ReplicationRegionInterface {
+
+ static {
+ ReplicationRPC.initialize();
+ }
+
+ protected static final Log LOG =
+ LogFactory.getLog(ReplicationRegionServer.class);
+
+ private final ReplicationSource replicationSource;
+ private ReplicationSink replicationSink;
+ private final boolean isMaster;
+ private final AtomicBoolean isReplicating = new AtomicBoolean(true);
+
+ private final ReplicationZookeeperHelper zkHelper;
+
+
+ /**
+ * Starts a HRegionServer at the default location
+ *
+ * @param conf
+ * @throws java.io.IOException
+ */
+ public ReplicationRegionServer(Configuration conf) throws IOException {
+ super(conf);
+
+ this.zkHelper = new ReplicationZookeeperHelper(
+ this.getZooKeeperWrapper(), this.conf, this.isReplicating);
+ this.isMaster = zkHelper.isMaster();
+
+ this.replicationSink = null;
+ this.replicationSource = this.isMaster ? new ReplicationSource(this,
+ super.stopRequested, this.isReplicating) : null;
+ }
+
+ @Override
+ protected HLog instantiateHLog(Path logdir) throws IOException {
+ HLog newlog = new ReplicationHLog(super.getFileSystem(),
+ logdir, conf, super.getLogRoller(),
+ this.replicationSource);
+ return newlog;
+ }
+
+ @Override
+ protected void init(final MapWritable c) throws IOException {
+ super.init(c);
+ String n = Thread.currentThread().getName();
+
+ String repLogPathStr =
+ ReplicationSink.getRepLogPath(getHServerInfo().getServerName());
+ Path repLogPath = new Path(getRootDir(), repLogPathStr);
+
+
+ Thread.UncaughtExceptionHandler handler =
+ new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(final Thread t, final Throwable e) {
+ abort();
+ LOG.fatal("Set stop flag in " + t.getName(), e);
+ }
+ };
+ if(this.isMaster) {
+ Threads.setDaemonThreadRunning(
+ this.replicationSource, n + ".replicationSource", handler);
+ } else {
+ this.replicationSink =
+ new ReplicationSink(conf,super.stopRequested,
+ repLogPath, getFileSystem(), getThreadWakeFrequency());
+ Threads.setDaemonThreadRunning(
+ this.replicationSink, n + ".replicationSink", handler);
+ }
+ }
+
+ @Override
+ protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+ throws IOException {
+ HRegion r = new ReplicationRegion(HTableDescriptor.getTableDir(super
+ .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
+ .getFileSystem(), super.conf, regionInfo,
+ super.getFlushRequester(), this.replicationSource);
+
+ r.initialize(null, new Progressable() {
+ public void progress() {
+ addProcessingMessage(regionInfo);
+ }
+ });
+ return r;
+ }
+
+
+ @Override
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+ this.replicationSink.replicateEntries(entries);
+ }
+
+ /**
+ *
+ * @param protocol
+ * @param clientVersion
+ * @return
+ * @throws IOException
+ */
+ public long getProtocolVersion(final String protocol,
+ final long clientVersion)
+ throws IOException {
+ if (protocol.equals(ReplicationRegionInterface.class.getName())) {
+ return HBaseRPCProtocolVersion.versionID;
+ }
+ throw new IOException("Unknown protocol to name node: " + protocol);
+ }
+
+ /**
+ *
+ * @return
+ */
+ public ReplicationZookeeperHelper getZkHelper() {
+ return zkHelper;
+ }
+
+ protected void join() {
+ super.join();
+ if(this.isMaster) {
+ Threads.shutdown(this.replicationSource);
+ } else {
+ Threads.shutdown(this.replicationSink);
+ }
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java
new file mode 100644
index 00000000000..c1caf1173f0
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is responsible for replicating the edits coming
+ * from another cluster. All edits are first put into a log that will
+ * be read later by the main thread.
+ *
+ * This replication process is currently waiting for the edits to be applied
+ * before any other entry can be appended to the log.
+ *
+ * The log is rolled but old ones aren't kept at the moment.
+ */
+public class ReplicationSink extends Thread {
+
+ public static final String REPLICATION_LOG_DIR = ".replogs";
+
+ static final Log LOG = LogFactory.getLog(ReplicationSink.class);
+ private final Configuration conf;
+
+ private final HTablePool pool;
+
+ private final AtomicBoolean stop;
+
+ private HLog.Reader reader;
+
+ private HLog.Writer writer;
+
+ private final FileSystem fs;
+
+ private Path path;
+
+ private long position = 0;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition newData = lock.newCondition();
+
+ private final AtomicLong editsSize = new AtomicLong(0);
+
+ private long lastEditSize = 0;
+
+ private final long logrollsize;
+
+ private final long threadWakeFrequency;
+
+ /**
+ * Create a sink for replication
+ * @param conf conf object
+ * @param stopper boolean to tell this thread to stop
+ * @param path the path to the log
+ * @param fs the filesystem to use
+ * @param threadWakeFrequency how long should the thread wait for edits
+ * @throws IOException thrown when HDFS goes bad or bad file name
+ */
+ public ReplicationSink(final Configuration conf,
+ final AtomicBoolean stopper, Path path,
+ FileSystem fs, long threadWakeFrequency)
+ throws IOException {
+ this.conf = conf;
+ this.pool = new HTablePool(this.conf, 10);
+ this.stop = stopper;
+ this.fs = fs;
+ this.path = path;
+ long blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
+ this.fs.getDefaultBlockSize());
+ float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
+ this.logrollsize = (long)(blocksize * multi);
+ this.threadWakeFrequency = threadWakeFrequency;
+ rollLog();
+
+ }
+
+ /**
+ * Put this array of entries into a log that will be read later
+ * @param entries
+ * @throws IOException
+ */
+ public void replicateEntries(HLog.Entry[] entries)
+ throws IOException {
+ try {
+ this.lock.lock();
+ if(!this.stop.get()) {
+ // add to WAL and defer actual inserts
+ try {
+ for(HLog.Entry entry : entries) {
+
+ this.writer.append(entry);
+ this.editsSize.addAndGet(entry.getKey().heapSize() +
+ entry.getEdit().heapSize());
+ }
+ this.writer.sync();
+ this.newData.signal();
+
+ } catch (IOException ioe) {
+ LOG.error("Unable to accept edit because", ioe);
+ throw ioe;
+ }
+ } else {
+ LOG.info("Won't be replicating data as we are shutting down");
+ }
+ } finally {
+ this.lock.unlock();
+
+ }
+ }
+
+ public void run() {
+
+ try {
+ HTableInterface table = null;
+ this.lock.lock();
+ while (!this.stop.get()) {
+ this.newData.await(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
+ try {
+ if(this.lastEditSize == this.editsSize.get()) {
+ continue;
+ }
+ // There's no tailing in HDFS so we create a new reader
+ // and seek every time
+ this.reader = HLog.getReader(this.fs, this.path, this.conf);
+
+ if (position != 0) {
+ this.reader.seek(position);
+ }
+
+ byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
+ List puts = new ArrayList();
+
+ // Very simple optimization where we batch sequences of rows going
+ // to the same table.
+ HLog.Entry entry = new HLog.Entry();
+ while (this.reader.next(entry) != null) {
+ KeyValue kv = entry.getEdit();
+
+ if (kv.isDelete()) {
+ Delete delete = new Delete(kv.getRow(), kv.getTimestamp(), null);
+ if (kv.isDeleteFamily()) {
+ delete.deleteFamily(kv.getFamily());
+ } else if (!kv.isEmptyColumn()) {
+ delete.deleteColumn(entry.getEdit().getFamily(),
+ kv.getQualifier());
+ }
+ table = pool.getTable(entry.getKey().getTablename());
+ table.delete(delete);
+ pool.putTable(table);
+
+ } else {
+ Put put = new Put(kv.getRow(), kv.getTimestamp(), null);
+ put.add(entry.getEdit().getFamily(),
+ kv.getQualifier(), kv.getValue());
+ // Switching table, flush
+ if (!Bytes.equals(lastTable, entry.getKey().getTablename())
+ && !puts.isEmpty()) {
+ table = pool.getTable(lastTable);
+ table.put(puts);
+ pool.putTable(table);
+ puts.clear();
+ }
+ lastTable = entry.getKey().getTablename();
+ puts.add(put);
+ }
+ }
+
+ if (!puts.isEmpty()) {
+ table = pool.getTable(lastTable);
+ table.put(puts);
+ pool.putTable(table);
+ }
+
+ position = this.reader.getPosition();
+
+ if(this.editsSize.get() > this.logrollsize) {
+ rollLog();
+ }
+ this.lastEditSize = editsSize.get();
+
+
+ } catch (EOFException eof) {
+ LOG.warn("Got EOF while reading, will continue on next notify");
+ } catch (TableNotFoundException ex) {
+ LOG.warn("Losing edits because: " + ex);
+ } finally {
+ this.newData.signal();
+ if(this.reader != null) {
+ this.reader.close();
+ }
+ this.reader = null;
+ }
+
+ }
+ close();
+ } catch (Exception ex) {
+ // Should we log rejected edits in a file for replay?
+ LOG.error("Unable to accept edit because", ex);
+ this.stop.set(true);
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ private void close() throws IOException {
+ this.writer.close();
+ if(reader != null) {
+ this.reader.close();
+ }
+ this.fs.delete(this.path,true);
+ }
+
+ // Delete the current log and start a new one with the same name
+ // TODO keep the old versions so that the writing thread isn't help up
+ // by the reading thead and this latter one could be reading older logs.
+ // At this point we are under the lock.
+ protected void rollLog() throws IOException {
+ if(! (this.editsSize.get() == 0)) {
+ this.writer.close();
+ if(this.reader != null) {
+ this.reader.close();
+ }
+ this.fs.delete(this.path,true);
+ }
+ this.writer = HLog.createWriter(this.fs, this.path, this.conf);
+ this.editsSize.set(0);
+ this.position = 0;
+ LOG.debug("New replication log");
+ }
+
+ /**
+ * Get the path of the file for this server
+ * @param serverName
+ * @return
+ */
+ public static String getRepLogPath(String serverName) {
+ StringBuilder dirName = new StringBuilder(REPLICATION_LOG_DIR);
+ dirName.append("/");
+ dirName.append(serverName);
+ return dirName.toString();
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java
new file mode 100644
index 00000000000..5e3efe93481
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.ReplicationConnectionManager;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Class that handles the source of a replication stream
+ * Currently does not handle more than 1 slave
+ * For each slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ */
+public class ReplicationSource extends Chore implements HConstants {
+
+ static final Log LOG = LogFactory.getLog(ReplicationSource.class);
+ private final LinkedBlockingQueue queue =
+ new LinkedBlockingQueue();
+ private final List tempArray = new ArrayList();
+ private final HLog.Entry[] dummyArray = new HLog.Entry[0];
+ private final ReplicationConnectionManager conn;
+ private final ReplicationZookeeperHelper zkHelper;
+ private final Configuration conf;
+ private final float ratio;
+ private final Random random;
+ private final AtomicBoolean isReplicating;
+
+ private List currentPeers;
+
+ /**
+ * Constructor used by region servers
+ * @param server the region server specialized in replication
+ * @param stopper the atomic boolean to use to stop the cluster
+ * @param isReplicating the atomic boolean that starts/stops replication
+ * @throws IOException
+ */
+ public ReplicationSource(final ReplicationRegionServer server,
+ final AtomicBoolean stopper,
+ final AtomicBoolean isReplicating)
+ throws IOException {
+ super(server.getThreadWakeFrequency(), stopper);
+ this.conf = server.getConfiguration();
+ this.conn = new ReplicationConnectionManager(this.conf);
+ this.zkHelper = server.getZkHelper();
+ this.ratio = this.conf.getFloat("replication.ratio", 0.1f);
+ currentPeers = new ArrayList();
+ this.random = new Random();
+ this.isReplicating = isReplicating;
+ }
+
+ @Override
+ protected boolean initialChore() {
+ this.chooseSinksForPeer(0);
+ return currentPeers.size() > 0;
+ }
+
+ /**
+ * Select a number of peers at random using the ratio. Mininum 1.
+ * @param index
+ */
+ private void chooseSinksForPeer(int index) {
+ this.currentPeers.clear();
+ List addresses = this.zkHelper.getPeersAddresses(index);
+ Map mapOfAdr =
+ new HashMap();
+ LOG.info("Considering " + addresses.size() +
+ " rs, with ratio " + ratio);
+ int nbPeers = (int)(Math.ceil (addresses.size()*ratio));
+ LOG.info("Getting " + nbPeers + " rs from peer cluster # " + index);
+ for(int i = 0; i < nbPeers; i++) {
+ HServerAddress adr =
+ addresses.get(this.random.nextInt(addresses.size()));
+ while(mapOfAdr.containsKey(adr.toString())) {
+ adr = addresses.get(this.random.nextInt(addresses.size()));
+ }
+ LOG.info("Choosing peer " + adr.toString());
+ mapOfAdr.put(adr.toString(), adr);
+ }
+ this.currentPeers.addAll(mapOfAdr.values());
+ }
+
+ /**
+ * Put a log entry in a replication queue if replication is enabled
+ * @param logEntry
+ */
+ public void enqueueLog(HLog.Entry logEntry) {
+ if(this.isReplicating.get()) {
+ this.queue.add(logEntry);
+ }
+ }
+
+ @Override
+ protected void chore() {
+ while(!super.stop.get()) {
+ // Drain the edits accumulated in the queue, select a node at random
+ // and send the edits. If it fails, get a new set of nodes and chose
+ // a new one to replicate to.
+ try {
+ this.queue.drainTo(this.tempArray);
+ if(this.tempArray.size() > 0) {
+ HServerAddress adr =
+ currentPeers.get(random.nextInt(this.currentPeers.size()));
+ ReplicationRegionInterface rrs = this.conn.getHRegionConnection(adr);
+ LOG.debug("Replicating " + this.tempArray.size()
+ + " to " + adr.toString());
+ rrs.replicateLogEntries(this.tempArray.toArray(dummyArray));
+ this.tempArray.clear();
+ }
+ return;
+ }
+ catch (IOException ioe) {
+ LOG.warn("Unable to replicate, retrying with a new node", ioe);
+
+ try{
+ Thread.sleep(1000);
+ } catch (InterruptedException e){
+ // continue
+ }
+
+ // Should wait in a backoff fashion?
+ // make sure we don't retry with the same node
+ chooseSinksForPeer(0);
+ }
+ }
+ }
+
+
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java
new file mode 100644
index 00000000000..32bcb63eb38
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.replication.ReplicationSource;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
+
+import java.io.IOException;
+
+/**
+ * HLog specialized in replication. It replicates every entry from every
+ * user table at the moment.
+ */
+public class ReplicationHLog extends HLog {
+
+ static final Log LOG = LogFactory.getLog(ReplicationHLog.class);
+
+ private ReplicationSource replicationSource;
+
+ private final boolean isReplicator;
+
+ /**
+ * New constructor used for replication
+ * @param fs filesystem to use
+ * @param dir directory to store the wal
+ * @param conf conf ot use
+ * @param listener log listener to pass to super class
+ * @param replicationSource where to put the entries
+ * @throws IOException
+ */
+ public ReplicationHLog(final FileSystem fs, final Path dir,
+ final Configuration conf,
+ final LogRollListener listener,
+ ReplicationSource replicationSource)
+ throws IOException {
+ super(fs, dir, conf, listener);
+ this.replicationSource = replicationSource;
+ this.isReplicator = this.replicationSource != null;
+ }
+
+ @Override
+ protected void doWrite(HRegionInfo info, HLogKey logKey,
+ KeyValue logEdit, long now)
+ throws IOException {
+ super.doWrite(info, logKey, logEdit, now);
+ if(this.isReplicator && ! (info.isMetaRegion() || info.isRootRegion())) {
+ this.replicationSource.enqueueLog(new Entry(logKey, logEdit));
+ }
+
+ }
+
+ public ReplicationSource getReplicationSource() {
+ return this.replicationSource;
+ }
+
+
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java
new file mode 100644
index 00000000000..0ed84a20355
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.ipc.RemoteException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Connection manager to communicate with the other clusters.
+ */
+public class ReplicationConnectionManager implements HConstants {
+
+ private final int numRetries;
+ private final int maxRPCAttempts;
+ private final long rpcTimeout;
+ private final Map servers =
+ new ConcurrentHashMap();
+ private final
+ Class extends ReplicationRegionInterface> serverInterfaceClass;
+ private final Configuration conf;
+
+ /**
+ * Constructor that sets up RPC to other clusters
+ * @param conf
+ */
+ public ReplicationConnectionManager(Configuration conf) {
+ this.conf = conf;
+ String serverClassName =
+ conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS);
+ this.numRetries = conf.getInt("hbase.client.retries.number", 10);
+ this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
+ this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
+ try {
+ this.serverInterfaceClass =
+ (Class extends ReplicationRegionInterface>)
+ Class.forName(serverClassName);
+ } catch (ClassNotFoundException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find region server interface " + serverClassName, e);
+ }
+ }
+
+ /**
+ * Get a connection to a distant region server for replication
+ * @param regionServer the address to use
+ * @return the connection to the region server
+ * @throws IOException
+ */
+ public ReplicationRegionInterface getHRegionConnection(
+ HServerAddress regionServer)
+ throws IOException {
+ ReplicationRegionInterface server;
+ synchronized (this.servers) {
+ // See if we already have a connection
+ server = this.servers.get(regionServer.toString());
+ if (server == null) { // Get a connection
+ try {
+ server = (ReplicationRegionInterface) HBaseRPC.waitForProxy(
+ serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
+ regionServer.getInetSocketAddress(), this.conf,
+ this.maxRPCAttempts, this.rpcTimeout);
+ } catch (RemoteException e) {
+ throw RemoteExceptionHandler.decodeRemoteException(e);
+ }
+ this.servers.put(regionServer.toString(), server);
+ }
+ }
+ return server;
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java
new file mode 100644
index 00000000000..ab508d719ef
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class servers as a helper for all things related to zookeeper
+ * in the replication contrib.
+ */
+public class ReplicationZookeeperHelper implements HConstants, Watcher {
+
+ static final Log LOG = LogFactory.getLog(ReplicationZookeeperHelper.class);
+
+ private final ZooKeeperWrapper zookeeperWrapper;
+
+ private final List peerClusters;
+
+ private final String replicationZNode;
+ private final String peersZNode;
+
+ private final String replicationStateNodeName;
+
+ private final boolean isMaster;
+
+ private final Configuration conf;
+
+ private final AtomicBoolean isReplicating;
+
+ /**
+ * Constructor used by region servers
+ * @param zookeeperWrapper zkw to wrap
+ * @param conf conf to use
+ * @param isReplicating atomic boolean to start/stop replication
+ * @throws IOException
+ */
+ public ReplicationZookeeperHelper(
+ ZooKeeperWrapper zookeeperWrapper, Configuration conf,
+ final AtomicBoolean isReplicating) throws IOException{
+ this.zookeeperWrapper = zookeeperWrapper;
+ this.conf = conf;
+ String replicationZNodeName =
+ conf.get("zookeeper.znode.replication", "replication");
+ String peersZNodeName =
+ conf.get("zookeeper.znode.peers", "peers");
+ String repMasterZNodeName =
+ conf.get("zookeeper.znode.master", "master");
+ this.replicationStateNodeName =
+ conf.get("zookeeper.znode.state", "state");
+
+
+ this.peerClusters = new ArrayList();
+ this.replicationZNode = zookeeperWrapper.getZNode(
+ zookeeperWrapper.getParentZNode(),replicationZNodeName);
+ this.peersZNode =
+ zookeeperWrapper.getZNode(replicationZNode,peersZNodeName);
+
+ List znodes =
+ this.zookeeperWrapper.listZnodes(this.peersZNode, this);
+ if(znodes != null) {
+ for(String znode : znodes) {
+ connectToPeer(znode);
+ }
+ }
+ String address = this.zookeeperWrapper.getData(this.replicationZNode,
+ repMasterZNodeName);
+
+ String thisCluster = this.conf.get(ZOOKEEPER_QUORUM)+":"+
+ this.conf.get("hbase.zookeeper.property.clientPort") +":" +
+ this.conf.get(ZOOKEEPER_ZNODE_PARENT);
+
+ this.isMaster = thisCluster.equals(address);
+
+ LOG.info("This cluster (" + thisCluster + ") is a "
+ + (this.isMaster ? "master" : "slave") + " for replication" +
+ ", compared with (" + address + ")");
+
+ this.isReplicating = isReplicating;
+
+ setIsReplicating();
+ }
+
+ /**
+ * Returns all region servers from given peer
+ * @param clusterIndex the cluster to interrogate
+ * @return addresses of all region servers
+ */
+ public List getPeersAddresses(int clusterIndex) {
+ return this.peerClusters.size() == 0 ?
+ null : this.peerClusters.get(clusterIndex).scanRSDirectory();
+ }
+
+ // This method connects this cluster to another one and registers it
+ private void connectToPeer(String znode) throws IOException {
+ String[] quorum =
+ this.zookeeperWrapper.getData(this.peersZNode, znode).split(":");
+ if(quorum.length == 3) {
+ Configuration otherConf = new Configuration(this.conf);
+ otherConf.set(ZOOKEEPER_QUORUM, quorum[0]);
+ otherConf.set("hbase.zookeeper.property.clientPort", quorum[1]);
+ otherConf.set(ZOOKEEPER_ZNODE_PARENT, quorum[2]);
+ this.peerClusters.add(new ZooKeeperWrapper(otherConf, this));
+ LOG.info("Added new peer cluster " + StringUtils.arrayToString(quorum));
+ } else {
+ LOG.error("Wrong format of cluster address: " +
+ this.zookeeperWrapper.getData(this.peersZNode, znode));
+ }
+ }
+
+ /**
+ * Tells if this cluster replicates or not
+ * @return
+ */
+ public boolean isMaster() {
+ return isMaster;
+ }
+
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ Event.EventType type = watchedEvent.getType();
+ LOG.info(("Got event " + type + " with path " + watchedEvent.getPath()));
+ if (type.equals(Event.EventType.NodeDataChanged)) {
+ setIsReplicating();
+ }
+ }
+
+ /**
+ * This reads the state znode for replication and sets the atomic boolean
+ */
+ private void setIsReplicating() {
+ String value = this.zookeeperWrapper.getDataAndWatch(
+ this.replicationZNode, this.replicationStateNodeName, this);
+ if(value != null) {
+ isReplicating.set(value.equals("true"));
+ LOG.info("Replication is now " + (isReplicating.get() ?
+ "started" : "stopped"));
+ }
+ }
+}
diff --git a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java
new file mode 100644
index 00000000000..f1f0bbcccf1
--- /dev/null
+++ b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.fs.Path;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestReplicationSink {
+
+ protected static final Log LOG =
+ LogFactory.getLog(TestReplicationSink.class);
+
+ private static final int BATCH_SIZE = 10;
+
+ private static final long SLEEP_TIME = 500;
+
+ private final static Configuration conf = HBaseConfiguration.create();
+
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private static ReplicationSink SINK;
+
+ private static final byte[] TABLE_NAME1 =
+ Bytes.toBytes("table1");
+ private static final byte[] TABLE_NAME2 =
+ Bytes.toBytes("table2");
+
+ private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
+ private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
+
+ private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
+
+ private static HTable table1;
+
+ private static HTable table2;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ TEST_UTIL.startMiniCluster(3);
+ Path repLogPath = new Path(TEST_UTIL.getTestDir(),
+ ReplicationSink.getRepLogPath("test_rep_sink"));
+ SINK = new ReplicationSink(conf,STOPPER,
+ TEST_UTIL.getTestDir(),
+ TEST_UTIL.getDFSCluster().getFileSystem(), 1000);
+ SINK.start();
+ table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
+ table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ STOPPER.set(true);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
+ table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
+ Thread.sleep(SLEEP_TIME);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {}
+
+ @Test
+ public void testBatchSink() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+ Thread.sleep(SLEEP_TIME);
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ assertEquals(scanRes.next(BATCH_SIZE).length, BATCH_SIZE);
+ }
+
+ @Test
+ public void testMixedPutDelete() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
+ for(int i = 0; i < BATCH_SIZE; i+=2) {
+ entries[i/2] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+ Thread.sleep(SLEEP_TIME);
+
+ entries = new HLog.Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i,
+ i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
+ }
+
+ SINK.replicateEntries(entries);
+ Thread.sleep(SLEEP_TIME);
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ assertEquals(BATCH_SIZE/2,scanRes.next(BATCH_SIZE).length);
+ }
+
+ @Test
+ public void testMixedPutTables() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] =
+ createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
+ i, KeyValue.Type.Put);
+ }
+
+ SINK.replicateEntries(entries);
+ Thread.sleep(SLEEP_TIME);
+ Scan scan = new Scan();
+ ResultScanner scanRes = table2.getScanner(scan);
+ for(Result res : scanRes) {
+ assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
+ }
+ }
+
+ @Test
+ public void testMixedDeletes() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[3];
+ for(int i = 0; i < 3; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+ Thread.sleep(SLEEP_TIME);
+ entries = new HLog.Entry[3];
+
+ entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
+ entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
+ entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
+
+ SINK.replicateEntries(entries);
+ Thread.sleep(SLEEP_TIME);
+
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ assertEquals(0, scanRes.next(3).length);
+ }
+
+ @Test
+ public void testRolling() throws Exception {
+ testMixedDeletes();
+ SINK.rollLog();
+ testMixedDeletes();
+ SINK.rollLog();
+ testMixedPutTables();
+ }
+
+ private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
+ byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
+ byte[] rowBytes = Bytes.toBytes(row);
+ final long now = System.currentTimeMillis();
+ KeyValue kv = null;
+ if(type.getCode() == KeyValue.Type.Put.getCode()) {
+ kv = new KeyValue(rowBytes, fam, fam, now,
+ KeyValue.Type.Put, Bytes.toBytes(row));
+ } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
+ kv = new KeyValue(rowBytes, fam, fam,
+ now, KeyValue.Type.DeleteColumn);
+ } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
+ kv = new KeyValue(rowBytes, fam, null,
+ now, KeyValue.Type.DeleteFamily);
+ }
+
+ HLogKey key = new HLogKey(table, table, now, now);
+
+ return new HLog.Entry(key, kv);
+ }
+}
diff --git a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
new file mode 100644
index 00000000000..affbf7a4f40
--- /dev/null
+++ b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
+
+import org.junit.*;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+
+public class TestReplication implements HConstants{
+
+ protected static final Log LOG = LogFactory.getLog(TestReplication.class);
+
+ private Configuration conf1;
+ private Configuration conf2;
+
+ private ZooKeeperWrapper zkw1;
+ private ZooKeeperWrapper zkw2;
+
+ private HBaseTestingUtility utility1;
+ private HBaseTestingUtility utility2;
+
+ private final int NB_ROWS_IN_BATCH = 100;
+ private final long SLEEP_TIME = 500;
+ private final int NB_RETRIES = 10;
+
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ try {
+ conf1 = HBaseConfiguration.create();
+ conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+ .getName());
+ conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+ .getName());
+ conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
+
+ utility1 = new HBaseTestingUtility(conf1);
+ utility1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+ zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance);
+ zkw1.writeZNode("/1", "replication", "");
+ zkw1.writeZNode("/1/replication", "master",
+ conf1.get(ZOOKEEPER_QUORUM)+":" +
+ conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+ setIsReplication("true");
+
+
+ LOG.info("Setup first Zk");
+
+ conf2 = HBaseConfiguration.create();
+ conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+ .getName());
+ conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+ .getName());
+ conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
+
+ utility2 = new HBaseTestingUtility(conf2);
+ utility2.setZkCluster(miniZK);
+ zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance);
+ zkw2.writeZNode("/2", "replication", "");
+ zkw2.writeZNode("/2/replication", "master",
+ conf1.get(ZOOKEEPER_QUORUM)+":" +
+ conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+
+ zkw1.writeZNode("/1/replication/peers", "test",
+ conf2.get(ZOOKEEPER_QUORUM)+":" +
+ conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+
+ LOG.info("Setup second Zk");
+ } catch (Exception ex) { ex.printStackTrace(); throw ex; }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {}
+
+ @Test
+ public void testReplication() throws Exception {
+ utility1.startMiniCluster();
+ utility2.startMiniCluster();
+
+ byte[] tableName = Bytes.toBytes("test");
+ byte[] famName = Bytes.toBytes("f");
+ byte[] row = Bytes.toBytes("row");
+
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ table.addFamily(fam);
+
+ HBaseAdmin admin1 = new HBaseAdmin(conf1);
+ HBaseAdmin admin2 = new HBaseAdmin(conf2);
+ admin1.createTable(table);
+ admin2.createTable(table);
+
+ Put put = new Put(row);
+ put.add(famName, row, row);
+
+ HTable table1 = new HTable(conf1, tableName);
+ table1.put(put);
+
+ HTable table2 = new HTable(conf2, tableName);
+ Get get = new Get(row);
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = table2.get(get);
+ if(res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+
+ Delete del = new Delete(row);
+ table1.delete(del);
+
+ table2 = new HTable(conf2, tableName);
+ get = new Get(row);
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ fail("Waited too much time for del replication");
+ }
+ Result res = table2.get(get);
+ if(res.size() >= 1) {
+ LOG.info("Row not deleted");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+
+ // normal Batch tests
+ table1.setAutoFlush(false);
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ put = new Put(Bytes.toBytes(i));
+ put.add(famName, row, row);
+ table1.put(put);
+ }
+ table1.flushCommits();
+
+ Scan scan = new Scan();
+
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ fail("Waited too much time for normal batch replication");
+ }
+ ResultScanner scanner = table2.getScanner(scan);
+ Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+ scanner.close();
+ if(res.length != NB_ROWS_IN_BATCH) {
+ LOG.info("Only got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+
+ table1.setAutoFlush(true);
+
+ // Test stopping replication
+ setIsReplication("false");
+
+ // Takes some ms for ZK to fire the watcher
+ Thread.sleep(100);
+
+
+ put = new Put(Bytes.toBytes("stop start"));
+ put.add(famName, row, row);
+ table1.put(put);
+
+ get = new Get(Bytes.toBytes("stop start"));
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ break;
+ }
+ Result res = table2.get(get);
+ if(res.size() >= 1) {
+ fail("Replication wasn't stopped");
+
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ // Test restart replication
+
+ setIsReplication("true");
+
+ Thread.sleep(100);
+
+ table1.put(put);
+
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = table2.get(get);
+ if(res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+
+ }
+
+ private void setIsReplication(String bool) throws Exception{
+ zkw1.writeZNode("/1/replication", "state", bool);
+ }
+}