HBASE-2129 Simple Master/Slave replication

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@907011 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2010-02-05 17:37:54 +00:00
parent fbc67df8b7
commit 10a4154a06
17 changed files with 2057 additions and 0 deletions

View File

@ -362,6 +362,7 @@ Release 0.21.0 - Unreleased
hfiles direct) uploader hfiles direct) uploader
HBASE-1433 Update hbase build to match core, use ivy, publish jars to maven HBASE-1433 Update hbase build to match core, use ivy, publish jars to maven
repo, etc. (Kay Kay via Stack) repo, etc. (Kay Kay via Stack)
HBASE-2129 Simple Master/Slave replication
OPTIMIZATIONS OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite HBASE-410 [testing] Speed up the test suite

View File

@ -0,0 +1,63 @@
# Script to add a peer to a cluster
# To see usage for this script, run:
#
# ${HBASE_HOME}/bin/hbase org.jruby.Main add_peer.rb
#
include Java
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.EmptyWatcher
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
# Name of this script
NAME = "add_peer"
# Print usage for this script
def usage
puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
exit!
end
if ARGV.size != 2
usage
end
LOG = LogFactory.getLog(NAME)
parts1 = ARGV[0].split(":")
LOG.info("Master cluster located at " + parts1[0] + " port " + parts1[1] + " in folder " + parts1[2])
c2 = HBaseConfiguration.create()
parts2 = ARGV[1].split(":")
LOG.info("Slave cluster located at " + parts2[0] + " port " + parts2[1] + " in folder " + parts2[2])
LOG.info("The addresses must be exactly the same as those in hbase-site.xml of each cluster.")
print "Are those info correct? [Y/n] "
answer = $stdin.gets.chomp
if answer.length != 0 || answer == "n" || answer == "no"
exit!
end
c1 = HBaseConfiguration.create()
c1.set(HConstants.ZOOKEEPER_QUORUM, parts1[0])
c1.set("hbase.zookeeper.property.clientPort", parts1[1])
c1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts1[2])
zkw1 = ZooKeeperWrapper.new(c1, EmptyWatcher.instance)
zkw1.writeZNode(parts1[2], "replication", "a")
zkw1.writeZNode(parts1[2] + "/replication", "master", ARGV[0]);
zkw1.writeZNode(parts1[2] + "/replication", "state", "true");
zkw1.writeZNode(parts1[2] + "/replication/peers", "test", ARGV[1]);
c2.set(HConstants.ZOOKEEPER_QUORUM, parts2[0])
c2.set("hbase.zookeeper.property.clientPort", parts2[1])
c2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts2[2])
zkw2 = ZooKeeperWrapper.new(c2, EmptyWatcher.instance)
zkw2.writeZNode(parts2[2], "replication", "a")
zkw2.writeZNode(parts2[2] + "/replication", "master", ARGV[0]);

View File

@ -0,0 +1,61 @@
# Script to recreate all tables from one cluster to another
# To see usage for this script, run:
#
# ${HBASE_HOME}/bin/hbase org.jruby.Main copy_tables_desc.rb
#
include Java
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.EmptyWatcher
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
# Name of this script
NAME = "copy_tables_desc"
# Print usage for this script
def usage
puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
exit!
end
if ARGV.size != 2
usage
end
LOG = LogFactory.getLog(NAME)
parts1 = ARGV[0].split(":")
LOG.info("Master cluster located at " + parts1[0] + " port " + parts1[1] + " in folder " + parts1[2])
parts2 = ARGV[1].split(":")
LOG.info("Slave cluster located at " + parts2[0] + " port " + parts2[1] + " in folder " + parts2[2])
print "Are those info correct? [Y/n] "
answer = $stdin.gets.chomp
if answer.length != 0 || answer == "n" || answer == "no"
exit!
end
c1 = HBaseConfiguration.create()
c1.set(HConstants.ZOOKEEPER_QUORUM, parts1[0])
c1.set("hbase.zookeeper.property.clientPort", parts1[1])
c1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts1[2])
admin1 = HBaseAdmin.new(c1)
c2 = HBaseConfiguration.create()
c2.set(HConstants.ZOOKEEPER_QUORUM, parts2[0])
c2.set("hbase.zookeeper.property.clientPort", parts2[1])
c2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts2[2])
admin2 = HBaseAdmin.new(c2)
for t in admin1.listTables()
admin2.createTable(t)
end

View File

@ -0,0 +1,52 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Before you can run these subtargets directly, you need
to call at top-level: ant deploy-contrib compile-core-test
-->
<project name="mdc_replication" default="jar">
<property name="hbase.root" location="../../.." />
<import file="../build-contrib.xml"/>
<path id="classpath">
<path refid="contrib.classpath"/>
</path>
<path id="test.classpath">
<path refid="test.contrib.classpath"/>
<fileset dir="${hbase.root}/lib">
<include name="zookeeper*.jar" />
</fileset>
</path>
<target name="package" depends="jar, jar-examples" unless="skip.contrib">
<mkdir dir="${dist.dir}/contrib/${name}"/>
<copy todir="${dist.dir}/contrib/${name}" includeEmptyDirs="false" flatten="true">
<fileset dir="${build.dir}">
<include name="hbase-${version}-${name}.jar" />
</fileset>
<fileset dir="${basedir}/bin">
<include name="add_peer.rb" />
</fileset>
</copy>
</target>
</project>

View File

@ -0,0 +1,107 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<ivy-module version="1.0">
<info organisation="org.apache.hadoop" module="${ant.project.name}" revision="${version}">
<license name="Apache 2.0"/>
<ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
<description>
Hadoop Core
</description>
</info>
<configurations defaultconfmapping="default">
<!--these match the Maven configurations-->
<conf name="default" extends="master,runtime"/>
<conf name="master" description="contains the artifact but no dependencies"/>
<conf name="runtime" description="runtime but not the artifact"
extends="client,server,s3-server,kfs,mandatory,jetty,ftp"/>
<conf name="mandatory" description="contains the critical dependencies"
extends="commons-logging,log4j"/>
<!--
These public configurations contain the core dependencies for running hadoop client or server.
The server is effectively a superset of the client.
-->
<conf name="client" description="client-side dependencies"
extends="mandatory,httpclient"/>
<conf name="server" description="server-side dependencies"
extends="client"/>
<conf name="s3-client" description="dependencies for working with S3/EC2 infrastructure"
extends="client"/>
<conf name="s3-server" description="dependencies for running on S3/EC2 infrastructure"
extends="s3-client,server"/>
<conf name="kfs" description="dependencies for KFS file system support"/>
<conf name="ftp" description="dependencies for workign with FTP filesytems"
extends="mandatory"/>
<conf name="jetty" description="Jetty provides the in-VM HTTP daemon" extends="commons-logging"/>
<conf name="common" extends="runtime,mandatory,httpclient,ftp,jetty"
description="common artifacts"/>
<!--Testing pulls in everything-->
<conf name="test" extends="master,common" description="the classpath needed to run tests"/>
<!--Private configurations. -->
<conf name="javadoc" visibility="private" description="artifacts required while performing doc generation"
extends="common,mandatory,jetty,lucene"/>
<conf name="releaseaudit" visibility="private"
description="Artifacts required for releaseaudit target"/>
<conf name="commons-logging" visibility="private"/>
<conf name="httpclient" visibility="private" extends="commons-logging"/>
<conf name="log4j" visibility="private"/>
<conf name="lucene" visibility="private"/>
<conf name="jdiff" visibility="private" extends="log4j,s3-client,jetty,server"/>
<conf name="checkstyle" visibility="private"/>
</configurations>
<publications>
<!--get the artifact from our module name-->
<artifact conf="master"/>
</publications>
<dependencies>
<!-- Common -->
<dependency org="org.apache.hadoop" name="hadoop-core"
rev="${hadoop-core.version}" conf="common->default" changing="true" >
<exclude conf="test"/>
</dependency>
<dependency org="org.apache.hadoop" name="hadoop-hdfs"
rev="${hadoop-hdfs.version}" conf="common->default" changing="true" >
<exclude conf="test"/>
</dependency>
<dependency org="org.jruby" name="jruby-complete"
rev="${jruby.version}" conf="common->default" />
<!-- Test -->
<!--
Test Zookeeper cluster
<dependency org="org.apache.hadoop" name="hadoop-mapred-test"
rev="${hadoop-mapred.version}" conf="test->default"/>
-->
<dependency org="org.apache.hadoop" name="hadoop-core-test"
rev="${hadoop-core.version}" conf="test->default" transitive="false" changing="true" />
<dependency org="org.apache.hadoop" name="hadoop-hdfs-test"
rev="${hadoop-hdfs.version}" conf="test->default" transitive="false" changing="true"/>
<dependency org="log4j" name="log4j"
rev="${log4j.version}" conf="test->master">
<exclude conf="jmx,mail,jms"/>
</dependency>
</dependencies>
</ivy-module>

View File

@ -0,0 +1,42 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
/**
* Helper class to add RPC-related configs for replication
*/
public class ReplicationRPC {
private static final byte RPC_CODE = 110;
private static boolean initialized = false;
public synchronized static void initialize() {
if (initialized) {
return;
}
HBaseRPC.addToMap(ReplicationRegionInterface.class, RPC_CODE);
initialized = true;
}
private ReplicationRPC() {
// Static helper class;
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import java.io.IOException;
/**
* Interface that defines replication
*/
public interface ReplicationRegionInterface extends HRegionInterface {
/**
* Replicates the given entries. The guarantee is that the given entries
* will be durable on the slave cluster if this method returns without
* and exception.
* @param entries entries to replicate
* @throws IOException
*/
public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
}

View File

@ -0,0 +1,136 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<html>
<!--
Copyright 2010 The Apache Software Foundation
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head />
<body bgcolor="white">
<h1>Multi Data Center Replication</h1>
This package provides replication between HBase clusters.
<p>
<h2>Table Of Contents</h2>
<ol>
<li><a href="#status">Status</a></li>
<li><a href="#requirements">Requirements</a></li>
<li><a href="#deployment">Deployment</a></li>
</ol>
<p>
<a name="status">
<h2>Status</h2>
</a>
<p>
This package isn't even alpha quality software and is only meant to be a base
for future developments. The current implementation offers the following
features:
<ol>
<li>Master/Slave replication limited to 1 slave. </li>
<li>Replication of all user tables.</li>
<li>Start/stop replication stream.</li>
<li>Supports cluters of different sizes.</li>
<li>Re-replication of entries from failed region
servers on the master cluster.</li>
</ol>
Please report bugs on the project's Jira when found.
<p>
<a name="requirements">
<h2>Requirements</h2>
</a>
<p>
Before trying out replication, make sure to review the following requirements:
<ol>
<li>Zookeeper should be handled by yourself, not by HBase, and should
always be available during the deployment. Currently you can't use
zoo.cfg to hold your Zookeeper configurations.</li>
<li>All machines from both clusters should be able to reach every
other machine since replication goes from any region server to any
other one on the slave cluster. That also includes the
Zookeeper clusters.</li>
<li>Both clusters should have the same HBase and Hadoop major revision.
For example, having 0.21.1 on the master and 0.21.0 on the slave is
correct but not 0.21.1 and 0.22.0.</li>
<li>Every table should exist with the exact same name and column
family names on both clusters.</li>
</ol>
<p>
<a name="deployment">
<h2>Deployment</h2>
</a>
<p>
The following steps describe how to enable replication from a cluster
to another. This must be done with both clusters offlined.
<ol>
<li>Copy the hbase-0.21.0-dev-mdc_replication.jar file from the
$HBASE_HOME/contrib/mdc_replication/ folder to $HBASE_HOME/lib on
both clusters.</li>
<li>Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
the following configurations:
<pre>
&lt;property&gt;
&lt;name&gt;hbase.regionserver.class&lt;/name&gt;
&lt;value&gt;org.apache.hadoop.hbase.ipc.ReplicationRegionInterface&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;hbase.regionserver.impl&lt;/name&gt;
&lt;value&gt;org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer&lt;/value&gt;
&lt;/property&gt;</pre>
</li>
<li>Run the following command on any cluster:
<pre>
$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/src/contrib/mdc_replication/bin/add_peer.tb</pre>
This will show you the help to setup the replication stream between
both clusters. If both clusters use the same Zookeeper cluster, you have
to use a different <b>zookeeper.znode.parent</b> since they can't
write in the same folder.
</li>
<li>You can now start and stop the clusters with your preferred method.</li>
</ol>
You can confirm that your setup works by looking at any region server's log
on the master cluster and look for the following lines;
<pre>
Considering 1 rs, with ratio 0.1
Getting 1 rs from peer cluster # 0
Choosing peer 10.10.1.49:62020</pre>
In this case it indicates that 1 region server from the slave cluster
was chosen for replication.<br><br>
Should you want to stop the replication while the clusters are running, open
the shell on the master cluster and issue this command:
<pre>
hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'</pre>
Where you replace the znode parent with the one configured on your master
cluster. Replication of already queued edits will still happen after you
issued that command but new entries won't be. To start it back, simply replace
"false" with "true" in the command.
<p>
</body>
</html>

View File

@ -0,0 +1,103 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* Specialized version of HRegion to handle replication. In particular,
* it replays all edits from the reconstruction log.
*/
public class ReplicationRegion extends HRegion {
static final Log LOG = LogFactory.getLog(ReplicationRegion.class);
private final ReplicationSource replicationSource;
public ReplicationRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, FlushRequester flushListener,
ReplicationSource repSource) {
super(basedir, log, fs, conf, regionInfo, flushListener);
this.replicationSource = repSource;
}
protected void doReconstructionLog(final Path oldLogFile,
final long minSeqId, final long maxSeqId, final Progressable reporter)
throws UnsupportedEncodingException, IOException {
super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
if(this.replicationSource == null) {
return;
}
if (oldLogFile == null || !getFilesystem().exists(oldLogFile)) {
return;
}
FileStatus[] stats = getFilesystem().listStatus(oldLogFile);
if (stats == null || stats.length == 0) {
LOG.warn("Passed reconstruction log " + oldLogFile
+ " is zero-length");
}
HLog.Reader reader = HLog.getReader(getFilesystem(), oldLogFile, getConf());
try {
HLog.Entry entry;
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
KeyValue val = entry.getEdit();
if (key.getLogSeqNum() < maxSeqId) {
continue;
}
// Don't replicate catalog entries and meta information like
// complete log flush.
if(!(Bytes.equals(key.getTablename(),ROOT_TABLE_NAME) ||
Bytes.equals(key.getTablename(),META_TABLE_NAME)) &&
!Bytes.equals(val.getFamily(), HLog.METAFAMILY)) {
this.replicationSource.enqueueLog(entry);
}
}
} finally {
reader.close();
}
}
}

View File

@ -0,0 +1,170 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.ReplicationRPC;
import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.replication.ReplicationHLog;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class ReplicationRegionServer extends HRegionServer
implements ReplicationRegionInterface {
static {
ReplicationRPC.initialize();
}
protected static final Log LOG =
LogFactory.getLog(ReplicationRegionServer.class);
private final ReplicationSource replicationSource;
private ReplicationSink replicationSink;
private final boolean isMaster;
private final AtomicBoolean isReplicating = new AtomicBoolean(true);
private final ReplicationZookeeperHelper zkHelper;
/**
* Starts a HRegionServer at the default location
*
* @param conf
* @throws java.io.IOException
*/
public ReplicationRegionServer(Configuration conf) throws IOException {
super(conf);
this.zkHelper = new ReplicationZookeeperHelper(
this.getZooKeeperWrapper(), this.conf, this.isReplicating);
this.isMaster = zkHelper.isMaster();
this.replicationSink = null;
this.replicationSource = this.isMaster ? new ReplicationSource(this,
super.stopRequested, this.isReplicating) : null;
}
@Override
protected HLog instantiateHLog(Path logdir) throws IOException {
HLog newlog = new ReplicationHLog(super.getFileSystem(),
logdir, conf, super.getLogRoller(),
this.replicationSource);
return newlog;
}
@Override
protected void init(final MapWritable c) throws IOException {
super.init(c);
String n = Thread.currentThread().getName();
String repLogPathStr =
ReplicationSink.getRepLogPath(getHServerInfo().getServerName());
Path repLogPath = new Path(getRootDir(), repLogPathStr);
Thread.UncaughtExceptionHandler handler =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
abort();
LOG.fatal("Set stop flag in " + t.getName(), e);
}
};
if(this.isMaster) {
Threads.setDaemonThreadRunning(
this.replicationSource, n + ".replicationSource", handler);
} else {
this.replicationSink =
new ReplicationSink(conf,super.stopRequested,
repLogPath, getFileSystem(), getThreadWakeFrequency());
Threads.setDaemonThreadRunning(
this.replicationSink, n + ".replicationSink", handler);
}
}
@Override
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new ReplicationRegion(HTableDescriptor.getTableDir(super
.getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
.getFileSystem(), super.conf, regionInfo,
super.getFlushRequester(), this.replicationSource);
r.initialize(null, new Progressable() {
public void progress() {
addProcessingMessage(regionInfo);
}
});
return r;
}
@Override
public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
this.replicationSink.replicateEntries(entries);
}
/**
*
* @param protocol
* @param clientVersion
* @return
* @throws IOException
*/
public long getProtocolVersion(final String protocol,
final long clientVersion)
throws IOException {
if (protocol.equals(ReplicationRegionInterface.class.getName())) {
return HBaseRPCProtocolVersion.versionID;
}
throw new IOException("Unknown protocol to name node: " + protocol);
}
/**
*
* @return
*/
public ReplicationZookeeperHelper getZkHelper() {
return zkHelper;
}
protected void join() {
super.join();
if(this.isMaster) {
Threads.shutdown(this.replicationSource);
} else {
Threads.shutdown(this.replicationSink);
}
}
}

View File

@ -0,0 +1,283 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class is responsible for replicating the edits coming
* from another cluster. All edits are first put into a log that will
* be read later by the main thread.
*
* This replication process is currently waiting for the edits to be applied
* before any other entry can be appended to the log.
*
* The log is rolled but old ones aren't kept at the moment.
*/
public class ReplicationSink extends Thread {
public static final String REPLICATION_LOG_DIR = ".replogs";
static final Log LOG = LogFactory.getLog(ReplicationSink.class);
private final Configuration conf;
private final HTablePool pool;
private final AtomicBoolean stop;
private HLog.Reader reader;
private HLog.Writer writer;
private final FileSystem fs;
private Path path;
private long position = 0;
private final Lock lock = new ReentrantLock();
private final Condition newData = lock.newCondition();
private final AtomicLong editsSize = new AtomicLong(0);
private long lastEditSize = 0;
private final long logrollsize;
private final long threadWakeFrequency;
/**
* Create a sink for replication
* @param conf conf object
* @param stopper boolean to tell this thread to stop
* @param path the path to the log
* @param fs the filesystem to use
* @param threadWakeFrequency how long should the thread wait for edits
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(final Configuration conf,
final AtomicBoolean stopper, Path path,
FileSystem fs, long threadWakeFrequency)
throws IOException {
this.conf = conf;
this.pool = new HTablePool(this.conf, 10);
this.stop = stopper;
this.fs = fs;
this.path = path;
long blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
this.fs.getDefaultBlockSize());
float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
this.logrollsize = (long)(blocksize * multi);
this.threadWakeFrequency = threadWakeFrequency;
rollLog();
}
/**
* Put this array of entries into a log that will be read later
* @param entries
* @throws IOException
*/
public void replicateEntries(HLog.Entry[] entries)
throws IOException {
try {
this.lock.lock();
if(!this.stop.get()) {
// add to WAL and defer actual inserts
try {
for(HLog.Entry entry : entries) {
this.writer.append(entry);
this.editsSize.addAndGet(entry.getKey().heapSize() +
entry.getEdit().heapSize());
}
this.writer.sync();
this.newData.signal();
} catch (IOException ioe) {
LOG.error("Unable to accept edit because", ioe);
throw ioe;
}
} else {
LOG.info("Won't be replicating data as we are shutting down");
}
} finally {
this.lock.unlock();
}
}
public void run() {
try {
HTableInterface table = null;
this.lock.lock();
while (!this.stop.get()) {
this.newData.await(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
try {
if(this.lastEditSize == this.editsSize.get()) {
continue;
}
// There's no tailing in HDFS so we create a new reader
// and seek every time
this.reader = HLog.getReader(this.fs, this.path, this.conf);
if (position != 0) {
this.reader.seek(position);
}
byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
List<Put> puts = new ArrayList<Put>();
// Very simple optimization where we batch sequences of rows going
// to the same table.
HLog.Entry entry = new HLog.Entry();
while (this.reader.next(entry) != null) {
KeyValue kv = entry.getEdit();
if (kv.isDelete()) {
Delete delete = new Delete(kv.getRow(), kv.getTimestamp(), null);
if (kv.isDeleteFamily()) {
delete.deleteFamily(kv.getFamily());
} else if (!kv.isEmptyColumn()) {
delete.deleteColumn(entry.getEdit().getFamily(),
kv.getQualifier());
}
table = pool.getTable(entry.getKey().getTablename());
table.delete(delete);
pool.putTable(table);
} else {
Put put = new Put(kv.getRow(), kv.getTimestamp(), null);
put.add(entry.getEdit().getFamily(),
kv.getQualifier(), kv.getValue());
// Switching table, flush
if (!Bytes.equals(lastTable, entry.getKey().getTablename())
&& !puts.isEmpty()) {
table = pool.getTable(lastTable);
table.put(puts);
pool.putTable(table);
puts.clear();
}
lastTable = entry.getKey().getTablename();
puts.add(put);
}
}
if (!puts.isEmpty()) {
table = pool.getTable(lastTable);
table.put(puts);
pool.putTable(table);
}
position = this.reader.getPosition();
if(this.editsSize.get() > this.logrollsize) {
rollLog();
}
this.lastEditSize = editsSize.get();
} catch (EOFException eof) {
LOG.warn("Got EOF while reading, will continue on next notify");
} catch (TableNotFoundException ex) {
LOG.warn("Losing edits because: " + ex);
} finally {
this.newData.signal();
if(this.reader != null) {
this.reader.close();
}
this.reader = null;
}
}
close();
} catch (Exception ex) {
// Should we log rejected edits in a file for replay?
LOG.error("Unable to accept edit because", ex);
this.stop.set(true);
} finally {
this.lock.unlock();
}
}
private void close() throws IOException {
this.writer.close();
if(reader != null) {
this.reader.close();
}
this.fs.delete(this.path,true);
}
// Delete the current log and start a new one with the same name
// TODO keep the old versions so that the writing thread isn't help up
// by the reading thead and this latter one could be reading older logs.
// At this point we are under the lock.
protected void rollLog() throws IOException {
if(! (this.editsSize.get() == 0)) {
this.writer.close();
if(this.reader != null) {
this.reader.close();
}
this.fs.delete(this.path,true);
}
this.writer = HLog.createWriter(this.fs, this.path, this.conf);
this.editsSize.set(0);
this.position = 0;
LOG.debug("New replication log");
}
/**
* Get the path of the file for this server
* @param serverName
* @return
*/
public static String getRepLogPath(String serverName) {
StringBuilder dirName = new StringBuilder(REPLICATION_LOG_DIR);
dirName.append("/");
dirName.append(serverName);
return dirName.toString();
}
}

View File

@ -0,0 +1,163 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.replication.ReplicationConnectionManager;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Class that handles the source of a replication stream
* Currently does not handle more than 1 slave
* For each slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected.
*/
public class ReplicationSource extends Chore implements HConstants {
static final Log LOG = LogFactory.getLog(ReplicationSource.class);
private final LinkedBlockingQueue<HLog.Entry> queue =
new LinkedBlockingQueue<HLog.Entry>();
private final List<HLog.Entry> tempArray = new ArrayList<HLog.Entry>();
private final HLog.Entry[] dummyArray = new HLog.Entry[0];
private final ReplicationConnectionManager conn;
private final ReplicationZookeeperHelper zkHelper;
private final Configuration conf;
private final float ratio;
private final Random random;
private final AtomicBoolean isReplicating;
private List<HServerAddress> currentPeers;
/**
* Constructor used by region servers
* @param server the region server specialized in replication
* @param stopper the atomic boolean to use to stop the cluster
* @param isReplicating the atomic boolean that starts/stops replication
* @throws IOException
*/
public ReplicationSource(final ReplicationRegionServer server,
final AtomicBoolean stopper,
final AtomicBoolean isReplicating)
throws IOException {
super(server.getThreadWakeFrequency(), stopper);
this.conf = server.getConfiguration();
this.conn = new ReplicationConnectionManager(this.conf);
this.zkHelper = server.getZkHelper();
this.ratio = this.conf.getFloat("replication.ratio", 0.1f);
currentPeers = new ArrayList<HServerAddress>();
this.random = new Random();
this.isReplicating = isReplicating;
}
@Override
protected boolean initialChore() {
this.chooseSinksForPeer(0);
return currentPeers.size() > 0;
}
/**
* Select a number of peers at random using the ratio. Mininum 1.
* @param index
*/
private void chooseSinksForPeer(int index) {
this.currentPeers.clear();
List<HServerAddress> addresses = this.zkHelper.getPeersAddresses(index);
Map<String, HServerAddress> mapOfAdr =
new HashMap<String, HServerAddress>();
LOG.info("Considering " + addresses.size() +
" rs, with ratio " + ratio);
int nbPeers = (int)(Math.ceil (addresses.size()*ratio));
LOG.info("Getting " + nbPeers + " rs from peer cluster # " + index);
for(int i = 0; i < nbPeers; i++) {
HServerAddress adr =
addresses.get(this.random.nextInt(addresses.size()));
while(mapOfAdr.containsKey(adr.toString())) {
adr = addresses.get(this.random.nextInt(addresses.size()));
}
LOG.info("Choosing peer " + adr.toString());
mapOfAdr.put(adr.toString(), adr);
}
this.currentPeers.addAll(mapOfAdr.values());
}
/**
* Put a log entry in a replication queue if replication is enabled
* @param logEntry
*/
public void enqueueLog(HLog.Entry logEntry) {
if(this.isReplicating.get()) {
this.queue.add(logEntry);
}
}
@Override
protected void chore() {
while(!super.stop.get()) {
// Drain the edits accumulated in the queue, select a node at random
// and send the edits. If it fails, get a new set of nodes and chose
// a new one to replicate to.
try {
this.queue.drainTo(this.tempArray);
if(this.tempArray.size() > 0) {
HServerAddress adr =
currentPeers.get(random.nextInt(this.currentPeers.size()));
ReplicationRegionInterface rrs = this.conn.getHRegionConnection(adr);
LOG.debug("Replicating " + this.tempArray.size()
+ " to " + adr.toString());
rrs.replicateLogEntries(this.tempArray.toArray(dummyArray));
this.tempArray.clear();
}
return;
}
catch (IOException ioe) {
LOG.warn("Unable to replicate, retrying with a new node", ioe);
try{
Thread.sleep(1000);
} catch (InterruptedException e){
// continue
}
// Should wait in a backoff fashion?
// make sure we don't retry with the same node
chooseSinksForPeer(0);
}
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.replication.ReplicationSource;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
import java.io.IOException;
/**
* HLog specialized in replication. It replicates every entry from every
* user table at the moment.
*/
public class ReplicationHLog extends HLog {
static final Log LOG = LogFactory.getLog(ReplicationHLog.class);
private ReplicationSource replicationSource;
private final boolean isReplicator;
/**
* New constructor used for replication
* @param fs filesystem to use
* @param dir directory to store the wal
* @param conf conf ot use
* @param listener log listener to pass to super class
* @param replicationSource where to put the entries
* @throws IOException
*/
public ReplicationHLog(final FileSystem fs, final Path dir,
final Configuration conf,
final LogRollListener listener,
ReplicationSource replicationSource)
throws IOException {
super(fs, dir, conf, listener);
this.replicationSource = replicationSource;
this.isReplicator = this.replicationSource != null;
}
@Override
protected void doWrite(HRegionInfo info, HLogKey logKey,
KeyValue logEdit, long now)
throws IOException {
super.doWrite(info, logKey, logEdit, now);
if(this.isReplicator && ! (info.isMetaRegion() || info.isRootRegion())) {
this.replicationSource.enqueueLog(new Entry(logKey, logEdit));
}
}
public ReplicationSource getReplicationSource() {
return this.replicationSource;
}
}

View File

@ -0,0 +1,97 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
import org.apache.hadoop.ipc.RemoteException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Connection manager to communicate with the other clusters.
*/
public class ReplicationConnectionManager implements HConstants {
private final int numRetries;
private final int maxRPCAttempts;
private final long rpcTimeout;
private final Map<String, ReplicationRegionInterface> servers =
new ConcurrentHashMap<String, ReplicationRegionInterface>();
private final
Class<? extends ReplicationRegionInterface> serverInterfaceClass;
private final Configuration conf;
/**
* Constructor that sets up RPC to other clusters
* @param conf
*/
public ReplicationConnectionManager(Configuration conf) {
this.conf = conf;
String serverClassName =
conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS);
this.numRetries = conf.getInt("hbase.client.retries.number", 10);
this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
try {
this.serverInterfaceClass =
(Class<? extends ReplicationRegionInterface>)
Class.forName(serverClassName);
} catch (ClassNotFoundException e) {
throw new UnsupportedOperationException(
"Unable to find region server interface " + serverClassName, e);
}
}
/**
* Get a connection to a distant region server for replication
* @param regionServer the address to use
* @return the connection to the region server
* @throws IOException
*/
public ReplicationRegionInterface getHRegionConnection(
HServerAddress regionServer)
throws IOException {
ReplicationRegionInterface server;
synchronized (this.servers) {
// See if we already have a connection
server = this.servers.get(regionServer.toString());
if (server == null) { // Get a connection
try {
server = (ReplicationRegionInterface) HBaseRPC.waitForProxy(
serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
regionServer.getInetSocketAddress(), this.conf,
this.maxRPCAttempts, this.rpcTimeout);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
this.servers.put(regionServer.toString(), server);
}
}
return server;
}
}

View File

@ -0,0 +1,169 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class servers as a helper for all things related to zookeeper
* in the replication contrib.
*/
public class ReplicationZookeeperHelper implements HConstants, Watcher {
static final Log LOG = LogFactory.getLog(ReplicationZookeeperHelper.class);
private final ZooKeeperWrapper zookeeperWrapper;
private final List<ZooKeeperWrapper> peerClusters;
private final String replicationZNode;
private final String peersZNode;
private final String replicationStateNodeName;
private final boolean isMaster;
private final Configuration conf;
private final AtomicBoolean isReplicating;
/**
* Constructor used by region servers
* @param zookeeperWrapper zkw to wrap
* @param conf conf to use
* @param isReplicating atomic boolean to start/stop replication
* @throws IOException
*/
public ReplicationZookeeperHelper(
ZooKeeperWrapper zookeeperWrapper, Configuration conf,
final AtomicBoolean isReplicating) throws IOException{
this.zookeeperWrapper = zookeeperWrapper;
this.conf = conf;
String replicationZNodeName =
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
conf.get("zookeeper.znode.peers", "peers");
String repMasterZNodeName =
conf.get("zookeeper.znode.master", "master");
this.replicationStateNodeName =
conf.get("zookeeper.znode.state", "state");
this.peerClusters = new ArrayList<ZooKeeperWrapper>();
this.replicationZNode = zookeeperWrapper.getZNode(
zookeeperWrapper.getParentZNode(),replicationZNodeName);
this.peersZNode =
zookeeperWrapper.getZNode(replicationZNode,peersZNodeName);
List<String> znodes =
this.zookeeperWrapper.listZnodes(this.peersZNode, this);
if(znodes != null) {
for(String znode : znodes) {
connectToPeer(znode);
}
}
String address = this.zookeeperWrapper.getData(this.replicationZNode,
repMasterZNodeName);
String thisCluster = this.conf.get(ZOOKEEPER_QUORUM)+":"+
this.conf.get("hbase.zookeeper.property.clientPort") +":" +
this.conf.get(ZOOKEEPER_ZNODE_PARENT);
this.isMaster = thisCluster.equals(address);
LOG.info("This cluster (" + thisCluster + ") is a "
+ (this.isMaster ? "master" : "slave") + " for replication" +
", compared with (" + address + ")");
this.isReplicating = isReplicating;
setIsReplicating();
}
/**
* Returns all region servers from given peer
* @param clusterIndex the cluster to interrogate
* @return addresses of all region servers
*/
public List<HServerAddress> getPeersAddresses(int clusterIndex) {
return this.peerClusters.size() == 0 ?
null : this.peerClusters.get(clusterIndex).scanRSDirectory();
}
// This method connects this cluster to another one and registers it
private void connectToPeer(String znode) throws IOException {
String[] quorum =
this.zookeeperWrapper.getData(this.peersZNode, znode).split(":");
if(quorum.length == 3) {
Configuration otherConf = new Configuration(this.conf);
otherConf.set(ZOOKEEPER_QUORUM, quorum[0]);
otherConf.set("hbase.zookeeper.property.clientPort", quorum[1]);
otherConf.set(ZOOKEEPER_ZNODE_PARENT, quorum[2]);
this.peerClusters.add(new ZooKeeperWrapper(otherConf, this));
LOG.info("Added new peer cluster " + StringUtils.arrayToString(quorum));
} else {
LOG.error("Wrong format of cluster address: " +
this.zookeeperWrapper.getData(this.peersZNode, znode));
}
}
/**
* Tells if this cluster replicates or not
* @return
*/
public boolean isMaster() {
return isMaster;
}
@Override
public void process(WatchedEvent watchedEvent) {
Event.EventType type = watchedEvent.getType();
LOG.info(("Got event " + type + " with path " + watchedEvent.getPath()));
if (type.equals(Event.EventType.NodeDataChanged)) {
setIsReplicating();
}
}
/**
* This reads the state znode for replication and sets the atomic boolean
*/
private void setIsReplicating() {
String value = this.zookeeperWrapper.getDataAndWatch(
this.replicationZNode, this.replicationStateNodeName, this);
if(value != null) {
isReplicating.set(value.equals("true"));
LOG.info("Replication is now " + (isReplicating.get() ?
"started" : "stopped"));
}
}
}

View File

@ -0,0 +1,219 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.fs.Path;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestReplicationSink {
protected static final Log LOG =
LogFactory.getLog(TestReplicationSink.class);
private static final int BATCH_SIZE = 10;
private static final long SLEEP_TIME = 500;
private final static Configuration conf = HBaseConfiguration.create();
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private static ReplicationSink SINK;
private static final byte[] TABLE_NAME1 =
Bytes.toBytes("table1");
private static final byte[] TABLE_NAME2 =
Bytes.toBytes("table2");
private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
private static HTable table1;
private static HTable table2;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
Path repLogPath = new Path(TEST_UTIL.getTestDir(),
ReplicationSink.getRepLogPath("test_rep_sink"));
SINK = new ReplicationSink(conf,STOPPER,
TEST_UTIL.getTestDir(),
TEST_UTIL.getDFSCluster().getFileSystem(), 1000);
SINK.start();
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
STOPPER.set(true);
TEST_UTIL.shutdownMiniCluster();
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
Thread.sleep(SLEEP_TIME);
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {}
@Test
public void testBatchSink() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(scanRes.next(BATCH_SIZE).length, BATCH_SIZE);
}
@Test
public void testMixedPutDelete() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
for(int i = 0; i < BATCH_SIZE; i+=2) {
entries[i/2] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = createEntry(TABLE_NAME1, i,
i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(BATCH_SIZE/2,scanRes.next(BATCH_SIZE).length);
}
@Test
public void testMixedPutTables() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] =
createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
}
}
@Test
public void testMixedDeletes() throws Exception {
HLog.Entry[] entries = new HLog.Entry[3];
for(int i = 0; i < 3; i++) {
entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
entries = new HLog.Entry[3];
entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(0, scanRes.next(3).length);
}
@Test
public void testRolling() throws Exception {
testMixedDeletes();
SINK.rollLog();
testMixedDeletes();
SINK.rollLog();
testMixedPutTables();
}
private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);
final long now = System.currentTimeMillis();
KeyValue kv = null;
if(type.getCode() == KeyValue.Type.Put.getCode()) {
kv = new KeyValue(rowBytes, fam, fam, now,
KeyValue.Type.Put, Bytes.toBytes(row));
} else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
kv = new KeyValue(rowBytes, fam, fam,
now, KeyValue.Type.DeleteColumn);
} else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
kv = new KeyValue(rowBytes, fam, null,
now, KeyValue.Type.DeleteFamily);
}
HLogKey key = new HLogKey(table, table, now, now);
return new HLog.Entry(key, kv);
}
}

View File

@ -0,0 +1,268 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertArrayEquals;
import org.junit.*;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
public class TestReplication implements HConstants{
protected static final Log LOG = LogFactory.getLog(TestReplication.class);
private Configuration conf1;
private Configuration conf2;
private ZooKeeperWrapper zkw1;
private ZooKeeperWrapper zkw2;
private HBaseTestingUtility utility1;
private HBaseTestingUtility utility2;
private final int NB_ROWS_IN_BATCH = 100;
private final long SLEEP_TIME = 500;
private final int NB_RETRIES = 10;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
try {
conf1 = HBaseConfiguration.create();
conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
.getName());
conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
.getName());
conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance);
zkw1.writeZNode("/1", "replication", "");
zkw1.writeZNode("/1/replication", "master",
conf1.get(ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1");
setIsReplication("true");
LOG.info("Setup first Zk");
conf2 = HBaseConfiguration.create();
conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
.getName());
conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
.getName());
conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance);
zkw2.writeZNode("/2", "replication", "");
zkw2.writeZNode("/2/replication", "master",
conf1.get(ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1");
zkw1.writeZNode("/1/replication/peers", "test",
conf2.get(ZOOKEEPER_QUORUM)+":" +
conf2.get("hbase.zookeeper.property.clientPort")+":/2");
LOG.info("Setup second Zk");
} catch (Exception ex) { ex.printStackTrace(); throw ex; }
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {}
@Test
public void testReplication() throws Exception {
utility1.startMiniCluster();
utility2.startMiniCluster();
byte[] tableName = Bytes.toBytes("test");
byte[] famName = Bytes.toBytes("f");
byte[] row = Bytes.toBytes("row");
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
table.addFamily(fam);
HBaseAdmin admin1 = new HBaseAdmin(conf1);
HBaseAdmin admin2 = new HBaseAdmin(conf2);
admin1.createTable(table);
admin2.createTable(table);
Put put = new Put(row);
put.add(famName, row, row);
HTable table1 = new HTable(conf1, tableName);
table1.put(put);
HTable table2 = new HTable(conf2, tableName);
Get get = new Get(row);
for(int i = 0; i < NB_RETRIES; i++) {
if(i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = table2.get(get);
if(res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
Delete del = new Delete(row);
table1.delete(del);
table2 = new HTable(conf2, tableName);
get = new Get(row);
for(int i = 0; i < NB_RETRIES; i++) {
if(i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
Result res = table2.get(get);
if(res.size() >= 1) {
LOG.info("Row not deleted");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
// normal Batch tests
table1.setAutoFlush(false);
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
put = new Put(Bytes.toBytes(i));
put.add(famName, row, row);
table1.put(put);
}
table1.flushCommits();
Scan scan = new Scan();
for(int i = 0; i < NB_RETRIES; i++) {
if(i==NB_RETRIES-1) {
fail("Waited too much time for normal batch replication");
}
ResultScanner scanner = table2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BATCH);
scanner.close();
if(res.length != NB_ROWS_IN_BATCH) {
LOG.info("Only got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
table1.setAutoFlush(true);
// Test stopping replication
setIsReplication("false");
// Takes some ms for ZK to fire the watcher
Thread.sleep(100);
put = new Put(Bytes.toBytes("stop start"));
put.add(famName, row, row);
table1.put(put);
get = new Get(Bytes.toBytes("stop start"));
for(int i = 0; i < NB_RETRIES; i++) {
if(i==NB_RETRIES-1) {
break;
}
Result res = table2.get(get);
if(res.size() >= 1) {
fail("Replication wasn't stopped");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
// Test restart replication
setIsReplication("true");
Thread.sleep(100);
table1.put(put);
for(int i = 0; i < NB_RETRIES; i++) {
if(i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = table2.get(get);
if(res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
}
private void setIsReplication(String bool) throws Exception{
zkw1.writeZNode("/1/replication", "state", bool);
}
}