HBASE-11522 Move Replication information into the Ref Guide (Misty Stanley-Jones)

This commit is contained in:
Jean-Daniel Cryans 2014-07-28 17:26:43 -07:00
parent ff655e04d1
commit fe54e7d7ae
2 changed files with 484 additions and 526 deletions

View File

@ -1107,18 +1107,490 @@ false
<section
xml:id="cluster_replication">
<title>Cluster Replication</title>
<para>See <link
xlink:href="http://hbase.apache.org/replication.html">Cluster Replication</link>. </para>
<note xml:id="cluster.replication.preserving.tags">
<title>Preserving Tags During Replication</title>
<para>By default, the codec used for replication between clusters strips tags, such as
cell-level ACLs, from cells. To prevent the tags from being stripped, you can use a
different codec which does not strip them. Configure
<code>hbase.replication.rpc.codec</code> to use
<literal>org.apache.hadoop.hbase.codec.KeyValueCodecWithTags</literal>, on both the source
and sink RegionServers involved in the replication. This option was introduced in <link
xlink:href="https://issues.apache.org/jira/browse/HBASE-10322">HBASE-10322</link>.</para>
<note>
<para>This information was previously available at <link
xlink:href="http://hbase.apache.org/replication.html">Cluster Replication</link>. </para>
</note>
<para>HBase provides a replication mechanism to copy data between HBase
clusters. Replication can be used as a disaster recovery solution and as a mechanism for high
availability. You can also use replication to separate web-facing operations from back-end
jobs such as MapReduce.</para>
<para>In terms of architecture, HBase replication is master-push. This takes advantage of the
fact that each region server has its own write-ahead log (WAL). One master cluster can
replicate to any number of slave clusters, and each region server replicates its own stream of
edits. For more information on the different properties of master/slave replication and other
types of replication, see the article <link
xlink:href="http://highscalability.com/blog/2009/8/24/how-google-serves-data-from-multiple-datacenters.html">How
Google Serves Data From Multiple Datacenters</link>.</para>
<para>Replication is asynchronous, allowing clusters to be geographically distant or to have
some gaps in availability. This also means that data between master and slave clusters will
not be instantly consistent. Rows inserted on the master are not immediately available or
consistent with rows on the slave clusters. rows inserted on the master cluster wont be
available at the same time on the slave clusters. The goal is eventual consistency. </para>
<para>The replication format used in this design is conceptually the same as the <firstterm><link
xlink:href="http://dev.mysql.com/doc/refman/5.1/en/replication-formats.html">statement-based
replication</link></firstterm> design used by MySQL. Instead of SQL statements, entire
WALEdits (consisting of multiple cell inserts coming from Put and Delete operations on the
clients) are replicated in order to maintain atomicity. </para>
<para>The WALs for each region server must be kept in HDFS as long as they are needed to
replicate data to any slave cluster. Each region server reads from the oldest log it needs to
replicate and keeps track of the current position inside ZooKeeper to simplify failure
recovery. That position, as well as the queue of WALs to process, may be different for every
slave cluster.</para>
<para>The clusters participating in replication can be of different sizes. The master
cluster relies on randomization to attempt to balance the stream of replication on the slave clusters</para>
<para>HBase supports master/master and cyclic replication as well as replication to multiple
slaves.</para>
<figure>
<title>Replication Architecture Overview</title>
<mediaobject>
<imageobject>
<imagedata fileref="replication_overview.png" />
</imageobject>
<textobject>
<para>Illustration of the replication architecture in HBase, as described in the prior
text.</para>
</textobject>
</mediaobject>
</figure>
<formalpara>
<title>Enabling and Configuring Replication</title>
<para>See the <link
xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/replication/package-summary.html#requirements">
API documentation for replication</link> for information on enabling and configuring
replication.</para>
</formalpara>
<section>
<title>Life of a WAL Edit</title>
<para>A single WAL edit goes through several steps in order to be replicated to a slave
cluster.</para>
<orderedlist>
<title>When the slave responds correctly:</title>
<listitem>
<para>A HBase client uses a Put or Delete operation to manipulate data in HBase.</para>
</listitem>
<listitem>
<para>The region server writes the request to the WAL in a way that would allow it to be
replayed if it were not written successfully.</para>
</listitem>
<listitem>
<para>If the changed cell corresponds to a column family that is scoped for replication,
the edit is added to the queue for replication.</para>
</listitem>
<listitem>
<para>In a separate thread, the edit is read from the log, as part of a batch process.
Only the KeyValues that are eligible for replication are kept. Replicable KeyValues are
part of a column family whose schema is scoped GLOBAL, are not part of a catalog such as
<code>hbase:meta</code>, and did not originate from the target slave cluster, in the
case of cyclic replication.</para>
</listitem>
<listitem>
<para>The edit is tagged with the master's UUID and added to a buffer. When the buffer is
filled, or the reader reaches the end of the file, the buffer is sent to a random region
server on the slave cluster.</para>
</listitem>
<listitem>
<para>The region server reads the edits sequentially and separates them into buffers, one
buffer per table. After all edits are read, each buffer is flushed using <link
xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/HTable.html"
>HTable</link>, HBase's normal client. The master's UUID is preserved in the edits
they are applied, in order to allow for cyclic replication.</para>
</listitem>
<listitem>
<para>In the master, the offset for the WAL that is currently being replicated is
registered in ZooKeeper.</para>
</listitem>
</orderedlist>
<orderedlist>
<title>When the slave does not respond:</title>
<listitem>
<para>The first three steps, where the edit is inserted, are identical.</para>
</listitem>
<listitem>
<para>Again in a separate thread, the region server reads, filters, and edits the log
edits in the same way as above. The slave region server does not answer the RPC
call.</para>
</listitem>
<listitem>
<para>The master sleeps and tries again a configurable number of times.</para>
</listitem>
<listitem>
<para>If the slave region server is still not available, the master selects a new subset
of region server to replicate to, and tries again to send the buffer of edits.</para>
</listitem>
<listitem>
<para>Meanwhile, the WALs are rolled and stored in a queue in ZooKeeper. Logs that are
<firstterm>archived</firstterm> by their region server, by moving them from the region
server's log directory to a central log directory, will update their paths in the
in-memory queue of the replicating thread.</para>
</listitem>
<listitem>
<para>When the slave cluster is finally available, the buffer is applied in the same way
as during normal processing. The master region server will then replicate the backlog of
logs that accumulated during the outage.</para>
</listitem>
</orderedlist>
<note xml:id="cluster.replication.preserving.tags">
<title>Preserving Tags During Replication</title>
<para>By default, the codec used for replication between clusters strips tags, such as
cell-level ACLs, from cells. To prevent the tags from being stripped, you can use a
different codec which does not strip them. Configure
<code>hbase.replication.rpc.codec</code> to use
<literal>org.apache.hadoop.hbase.codec.KeyValueCodecWithTags</literal>, on both the
source and sink RegionServers involved in the replication. This option was introduced in
<link xlink:href="https://issues.apache.org/jira/browse/HBASE-10322"
>HBASE-10322</link>.</para>
</note>
</section>
<section>
<title>Replication Internals</title>
<variablelist>
<varlistentry>
<term>Replication State in ZooKeeper</term>
<listitem>
<para>HBase replication maintains its state in ZooKeeper. By default, the state is
contained in the base node <filename>/hbase/replication</filename>. This node contains
two child nodes, the <code>Peers</code> znode and the <code>RS</code> znode.</para>
</listitem>
</varlistentry>
<varlistentry>
<term>The <code>Peers</code> Znode</term>
<listitem>
<para>The <code>peers</code> znode is stored in
<filename>/hbase/replication/peers</filename> by default. It consists of a list of
all peer replication clusters, along with the status of each of them. The value of
each peer is its cluster key, which is provided in the HBase Shell. The cluster key
contains a list of ZooKeeper nodes in the cluster's quorum, the client port for the
ZooKeeper quorum, and the base znode for HBase in HDFS on that cluster.</para>
<screen>
/hbase/replication/peers
/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
</screen>
<para>Each peer has a child znode which indicates whether or not replication is enabled
on that cluster. These peer-state znodes do not contain any child znodes, but only
contain a Boolean value. This value is read and maintained by the
R<code>eplicationPeer.PeerStateTracker</code> class.</para>
<screen>
/hbase/replication/peers
/1/peer-state [Value: ENABLED]
/2/peer-state [Value: DISABLED]
</screen>
</listitem>
</varlistentry>
<varlistentry>
<term>The <code>RS</code> Znode</term>
<listitem>
<para>The <code>rs</code> znode contains a list of WAL logs which need to be replicated.
This list is divided into a set of queues organized by region server and the peer
cluster the region server is shipping the logs to. The rs znode has one child znode
for each region server in the cluster. The child znode name is the region server's
hostname, client port, and start code. This list includes both live and dead region
servers.</para>
<screen>
/hbase/replication/rs
/hostname.example.org,6020,1234
/hostname2.example.org,6020,2856
</screen>
<para>Each <code>rs</code> znode contains a list of WAL replication queues, one queue
for each peer cluster it replicates to. These queues are represented by child znodes
named by the cluster ID of the peer cluster they represent.</para>
<screen>
/hbase/replication/rs
/hostname.example.org,6020,1234
/1
/2
</screen>
<para>Each queue has one child znode for each WAL log that still needs to be replicated.
the value of these child znodes is the last position that was replicated. This
position is updated each time a WAL log is replicated.</para>
<screen>
/hbase/replication/rs
/hostname.example.org,6020,1234
/1
23522342.23422 [VALUE: 254]
12340993.22342 [VALUE: 0]
</screen>
</listitem>
</varlistentry>
</variablelist>
</section>
<section>
<title>Replication Configuration Options</title>
<informaltable>
<tgroup cols="3">
<thead>
<row>
<entry>Option</entry>
<entry>Description</entry>
<entry>Default</entry>
</row>
</thead>
<tbody>
<row>
<entry><para><code>zookeeper.znode.parent</code></para></entry>
<entry><para>The name of the base ZooKeeper znode used for HBase</para></entry>
<entry><para><literal>/hbase</literal></para></entry>
</row>
<row>
<entry><para><code>zookeeper.znode.replication</code></para></entry>
<entry><para>The name of the base znode used for replication</para></entry>
<entry><para><literal>replication</literal></para></entry>
</row>
<row>
<entry><para><code>zookeeper.znode.replication.peers</code></para></entry>
<entry><para>The name of the <code>peer</code> znode</para></entry>
<entry><para><literal>peers</literal></para></entry>
</row>
<row>
<entry><para><code>zookeeper.znode.replication.peers.state</code></para></entry>
<entry><para>The name of <code>peer-state</code> znode</para></entry>
<entry><para><literal>peer-state</literal></para></entry>
</row>
<row>
<entry><para><code>zookeeper.znode.replication.rs</code></para></entry>
<entry><para>The name of the <code>rs</code> znode</para></entry>
<entry><para><literal>rs</literal></para></entry>
</row>
<row>
<entry><para><code>hbase.replication</code></para></entry>
<entry><para>Whether replication is enabled or disabled on a given cluster</para></entry>
<entry><para><literal>false</literal></para></entry>
</row>
<row>
<entry><para><code>eplication.sleep.before.failover</code></para></entry>
<entry><para>How many milliseconds a worker should sleep before attempting to replicate
a dead region server's WAL queues.</para></entry>
<entry><para><literal></literal></para></entry>
</row>
<row>
<entry><para><code>replication.executor.workers</code></para></entry>
<entry><para>The number of region servers a given region server should attempt to
failover simultaneously.</para></entry>
<entry><para><literal>1</literal></para></entry>
</row>
</tbody>
</tgroup>
</informaltable>
</section>
<section>
<title>Replication Implementation Details</title>
<formalpara>
<title>Choosing Region Servers to Replicate To</title>
<para>When a master cluster region server initiates a replication source to a slave cluster,
it first connects to the slave's ZooKeeper ensemble using the provided cluster key . It
then scans the <filename>rs/</filename> directory to discover all the available sinks
(region servers that are accepting incoming streams of edits to replicate) and randomly
chooses a subset of them using a configured ratio which has a default value of 10%. For
example, if a slave cluster has 150 machines, 15 will be chosen as potential recipient for
edits that this master cluster region server sends. Because this selection is performed by
each master region server, the probability that all slave region servers are used is very
high, and this method works for clusters of any size. For example, a master cluster of 10
machines replicating to a slave cluster of 5 machines with a ratio of 10% causes the
master cluster region servers to choose one machine each at random.</para>
</formalpara>
<para>A ZooKeeper watcher is placed on the
<filename>${<replaceable>zookeeper.znode.parent</replaceable>}/rs</filename> node of the
slave cluster by each of the master cluster's region servers. This watch is used to monitor
changes in the composition of the slave cluster. When nodes are removed from the slave
cluster, or if nodes go down or come back up, the master cluster's region servers will
respond by selecting a new pool of slave region servers to replicate to.</para>
<formalpara>
<title>Keeping Track of Logs</title>
<para>Each master cluster region server has its own znode in the replication znodes
hierarchy. It contains one znode per peer cluster (if 5 slave clusters, 5 znodes are
created), and each of these contain a queue of WALs to process. Each of these queues will
track the WALs created by that region server, but they can differ in size. For example, if
one slave cluster becomes unavailable for some time, the WALs should not be deleted, so
they need to stay in the queue while the others are processed. See <xref
linkend="rs.failover.details"/> for an example.</para>
</formalpara>
<para>When a source is instantiated, it contains the current WAL that the region server is
writing to. During log rolling, the new file is added to the queue of each slave cluster's
znode just before it is made available. This ensures that all the sources are aware that a
new log exists before the region server is able to append edits into it, but this operations
is now more expensive. The queue items are discarded when the replication thread cannot read
more entries from a file (because it reached the end of the last block) and there are other
files in the queue. This means that if a source is up to date and replicates from the log
that the region server writes to, reading up to the "end" of the current file will not
delete the item in the queue.</para>
<para>A log can be archived if it is no longer used or if the number of logs exceeds
<code>hbase.regionserver.maxlogs</code> because the insertion rate is faster than regions
are flushed. When a log is archived, the source threads are notified that the path for that
log changed. If a particular source has already finished with an archived log, it will just
ignore the message. If the log is in the queue, the path will be updated in memory. If the
log is currently being replicated, the change will be done atomically so that the reader
doesn't attempt to open the file when has already been moved. Because moving a file is a
NameNode operation , if the reader is currently reading the log, it won't generate any
exception.</para>
<formalpara>
<title>Reading, Filtering and Sending Edits</title>
<para>By default, a source attempts to read from a WAL and ship log entries to a sink as
quickly as possible. Speed is limited by the filtering of log entries Only KeyValues that
are scoped GLOBAL and that do not belong to catalog tables will be retained. Speed is also
limited by total size of the list of edits to replicate per slave, which is limited to 64
MB by default. With this configuration, a master cluster region server with three slaves
would use at most 192 MB to store data to replicate. This does not account for the data
which was filtered but not garbage collected.</para>
</formalpara>
<para>Once the maximum size of edits has been buffered or the reader reaces the end of the
WAL, the source thread stops reading and chooses at random a sink to replicate to (from the
list that was generated by keeping only a subset of slave region servers). It directly
issues a RPC to the chosen region server and waits for the method to return. If the RPC was
successful, the source determines whether the current file has been emptied or it contains
more data which needs to be read. If the file has been emptied, the source deletes the znode
in the queue. Otherwise, it registers the new offset in the log's znode. If the RPC threw an
exception, the source will retry 10 times before trying to find a different sink.</para>
<formalpara>
<title>Cleaning Logs</title>
<para>If replication is not enabled, the master's log-cleaning thread deletes old logs using
a configured TTL. This TTL-based method does not work well with replication, because
archived logs which have exceeded their TTL may still be in a queue. The default behavior
is augmented so that if a log is past its TTL, the cleaning thread looks up every queue
until it finds the log, while caching queues it has found. If the log is not found in any
queues, the log will be deleted. The next time the cleaning process needs to look for a
log, it starts by using its cached list.</para>
</formalpara>
<formalpara xml:id="rs.failover.details">
<title>Region Server Failover</title>
<para>When no region servers are failing, keeping track of the logs in ZooKeeper adds no
value. Unfortunately, region servers do fail, and since ZooKeeper is highly available, it
is useful for managing the transfer of the queues in the event of a failure.</para>
</formalpara>
<para>Each of the master cluster region servers keeps a watcher on every other region server,
in order to be notified when one dies (just as the master does). When a failure happens,
they all race to create a znode called <literal>lock</literal> inside the dead region
server's znode that contains its queues. The region server that creates it successfully then
transfers all the queues to its own znode, one at a time since ZooKeeper does not support
renaming queues. After queues are all transferred, they are deleted from the old location.
The znodes that were recovered are renamed with the ID of the slave cluster appended with
the name of the dead server.</para>
<para>Next, the master cluster region server creates one new source thread per copied queue,
and each of the source threads follows the read/filter/ship pattern. The main difference is
that those queues will never receive new data, since they do not belong to their new region
server. When the reader hits the end of the last log, the queue's znode is deleted and the
master cluster region server closes that replication source.</para>
<para>Given a master cluster with 3 region servers replicating to a single slave with id
<literal>2</literal>, the following hierarchy represents what the znodes layout could be
at some point in time. The region servers' znodes all contain a <literal>peers</literal>
znode which contains a single queue. The znode names in the queues represent the actual file
names on HDFS in the form
<literal><replaceable>address</replaceable>,<replaceable>port</replaceable>.<replaceable>timestamp</replaceable></literal>.</para>
<screen>
/hbase/replication/rs/
1.1.1.1,60020,123456780/
2/
1.1.1.1,60020.1234 (Contains a position)
1.1.1.1,60020.1265
1.1.1.2,60020,123456790/
2/
1.1.1.2,60020.1214 (Contains a position)
1.1.1.2,60020.1248
1.1.1.2,60020.1312
1.1.1.3,60020, 123456630/
2/
1.1.1.3,60020.1280 (Contains a position)
</screen>
<para>Assume that 1.1.1.2 loses its ZooKeeper session. The survivors will race to create a
lock, and, arbitrarily, 1.1.1.3 wins. It will then start transferring all the queues to its
local peers znode by appending the name of the dead server. Right before 1.1.1.3 is able to
clean up the old znodes, the layout will look like the following:</para>
<screen>
/hbase/replication/rs/
1.1.1.1,60020,123456780/
2/
1.1.1.1,60020.1234 (Contains a position)
1.1.1.1,60020.1265
1.1.1.2,60020,123456790/
lock
2/
1.1.1.2,60020.1214 (Contains a position)
1.1.1.2,60020.1248
1.1.1.2,60020.1312
1.1.1.3,60020,123456630/
2/
1.1.1.3,60020.1280 (Contains a position)
2-1.1.1.2,60020,123456790/
1.1.1.2,60020.1214 (Contains a position)
1.1.1.2,60020.1248
1.1.1.2,60020.1312
</screen>
<para>Some time later, but before 1.1.1.3 is able to finish replicating the last WAL from
1.1.1.2, it dies too. Some new logs were also created in the normal queues. The last region
server will then try to lock 1.1.1.3's znode and will begin transferring all the queues. The
new layout will be:</para>
<screen>
/hbase/replication/rs/
1.1.1.1,60020,123456780/
2/
1.1.1.1,60020.1378 (Contains a position)
2-1.1.1.3,60020,123456630/
1.1.1.3,60020.1325 (Contains a position)
1.1.1.3,60020.1401
2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/
1.1.1.2,60020.1312 (Contains a position)
1.1.1.3,60020,123456630/
lock
2/
1.1.1.3,60020.1325 (Contains a position)
1.1.1.3,60020.1401
2-1.1.1.2,60020,123456790/
1.1.1.2,60020.1312 (Contains a position)
</screen>
<formalpara>
<title>Replication Metrics</title>
<para>The following metrics are exposed at the global region server level and (since HBase
0.95) at the peer level:</para>
</formalpara>
<variablelist>
<varlistentry>
<term><code>source.sizeOfLogQueue</code></term>
<listitem>
<para>number of WALs to process (excludes the one which is being processed) at the
Replication source</para>
</listitem>
</varlistentry>
<varlistentry>
<term><code>source.shippedOps</code></term>
<listitem>
<para>number of mutations shipped</para>
</listitem>
</varlistentry>
<varlistentry>
<term><code>source.logEditsRead</code></term>
<listitem>
<para>number of mutations read from HLogs at the replication source</para>
</listitem>
</varlistentry>
<varlistentry>
<term><code>source.ageOfLastShippedOp</code></term>
<listitem>
<para>age of last batch that was shipped by the replication source</para>
</listitem>
</varlistentry>
</variablelist>
</section>
</section>
<section
xml:id="ops.backup">

View File

@ -26,520 +26,6 @@
</title>
</properties>
<body>
<section name="Overview">
<p>
The replication feature of Apache HBase (TM) provides a way to copy data between HBase deployments. It
can serve as a disaster recovery solution and can contribute to provide
higher availability at the HBase layer. It can also serve more practically;
for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce"
cluster which will process old and new data and ship back the results
automatically.
</p>
<p>
The basic architecture pattern used for Apache HBase replication is (HBase cluster) master-push;
it is much easier to keep track of whats currently being replicated since
each region server has its own write-ahead-log (aka WAL or HLog), just like
other well known solutions like MySQL master/slave replication where
theres only one bin log to keep track of. One master cluster can
replicate to any number of slave clusters, and each region server will
participate to replicate their own stream of edits. For more information
on the different properties of master/slave replication and other types
of replication, please consult <a href="http://highscalability.com/blog/2009/8/24/how-google-serves-data-from-multiple-datacenters.html">
How Google Serves Data From Multiple Datacenters</a>.
</p>
<p>
The replication is done asynchronously, meaning that the clusters can
be geographically distant, the links between them can be offline for
some time, and rows inserted on the master cluster wont be
available at the same time on the slave clusters (eventual consistency).
</p>
<p>
The replication format used in this design is conceptually the same as
<a href="http://dev.mysql.com/doc/refman/5.1/en/replication-formats.html">
MySQLs statement-based replication </a>. Instead of SQL statements, whole
WALEdits (consisting of multiple cell inserts coming from the clients'
Put and Delete) are replicated in order to maintain atomicity.
</p>
<p>
The HLogs from each region server are the basis of HBase replication,
and must be kept in HDFS as long as they are needed to replicate data
to any slave cluster. Each RS reads from the oldest log it needs to
replicate and keeps the current position inside ZooKeeper to simplify
failure recovery. That position can be different for every slave
cluster, same for the queue of HLogs to process.
</p>
<p>
The clusters participating in replication can be of asymmetric sizes
and the master cluster will do its “best effort” to balance the stream
of replication on the slave clusters by relying on randomization.
</p>
<p>
As of version 0.92, Apache HBase supports master/master and cyclic
replication as well as replication to multiple slaves.
</p>
<img src="images/replication_overview.png"/>
</section>
<section name="Enabling replication">
<p>
The guide on enabling and using cluster replication is contained
in the API documentation shipped with your Apache HBase distribution.
</p>
<p>
The most up-to-date documentation is
<a href="apidocs/org/apache/hadoop/hbase/replication/package-summary.html#requirements">
available at this address</a>.
</p>
</section>
<section name="Life of a log edit">
<p>
The following sections describe the life of a single edit going from a
client that communicates with a master cluster all the way to a single
slave cluster.
</p>
<section name="Normal processing">
<p>
The client uses an API that sends a Put, Delete or ICV to a region
server. The key values are transformed into a WALEdit by the region
server and is inspected by the replication code that, for each family
that is scoped for replication, adds the scope to the edit. The edit
is appended to the current WAL and is then applied to its MemStore.
</p>
<p>
In a separate thread, the edit is read from the log (as part of a batch)
and only the KVs that are replicable are kept (that is, that they are part
of a family scoped GLOBAL in the family's schema, non-catalog so not
hbase:meta or -ROOT-, and did not originate in the target slave cluster - in
case of cyclic replication).
</p>
<p>
The edit is then tagged with the master's cluster UUID.
When the buffer is filled, or the reader hits the end of the file,
the buffer is sent to a random region server on the slave cluster.
</p>
<p>
Synchronously, the region server that receives the edits reads them
sequentially and separates each of them into buffers, one per table.
Once all edits are read, each buffer is flushed using HTable, the normal
HBase client.The master's cluster UUID is retained in the edits applied at
the slave cluster in order to allow cyclic replication.
</p>
<p>
Back in the master cluster's region server, the offset for the current
WAL that's being replicated is registered in ZooKeeper.
</p>
</section>
<section name="Non-responding slave clusters">
<p>
The edit is inserted in the same way.
</p>
<p>
In the separate thread, the region server reads, filters and buffers
the log edits the same way as during normal processing. The slave
region server that's contacted doesn't answer to the RPC, so the master
region server will sleep and retry up to a configured number of times.
If the slave RS still isn't available, the master cluster RS will select a
new subset of RS to replicate to and will retry sending the buffer of
edits.
</p>
<p>
In the mean time, the WALs will be rolled and stored in a queue in
ZooKeeper. Logs that are archived by their region server (archiving is
basically moving a log from the region server's logs directory to a
central logs archive directory) will update their paths in the in-memory
queue of the replicating thread.
</p>
<p>
When the slave cluster is finally available, the buffer will be applied
the same way as during normal processing. The master cluster RS will then
replicate the backlog of logs.
</p>
</section>
</section>
<section name="Internals">
<p>
This section describes in depth how each of replication's internal
features operate.
</p>
<section name="Replication Zookeeper State">
<p>
HBase replication maintains all of its state in Zookeeper. By default, this state is
contained in the base znode:
</p>
<pre>
/hbase/replication
</pre>
<p>
There are two major child znodes in the base replication znode:
<ul>
<li><b>Peers znode:</b> /hbase/replication/peers</li>
<li><b>RS znode:</b> /hbase/replication/rs</li>
</ul>
</p>
<section name="The Peers znode">
<p>
The <b>peers znode</b> contains a list of all peer replication clusters and the
current replication state of those clusters. It has one child <i>peer znode</i>
for each peer cluster. The <i>peer znode</i> is named with the cluster id provided
by the user in the HBase shell. The value of the <i>peer znode</i> contains
the peers cluster key provided by the user in the HBase Shell. The cluster key
contains a list of zookeeper nodes in the clusters quorum, the client port for the
zookeeper quorum, and the base znode for HBase
(i.e. “zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase”).
</p>
<pre>
/hbase/replication/peers
/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
</pre>
<p>
Each of these <i>peer znodes</i> has a child znode that indicates whether or not
replication is enabled on that peer cluster. These <i>peer-state znodes</i> do not
have child znodes and simply contain a boolean value (i.e. ENABLED or DISABLED).
This value is read/maintained by the <i>ReplicationPeer.PeerStateTracker</i> class.
</p>
<pre>
/hbase/replication/peers
/1/peer-state [Value: ENABLED]
/2/peer-state [Value: DISABLED]
</pre>
</section>
<section name="The RS znode">
<p>
The <b>rs znode</b> contains a list of all outstanding HLog files in the cluster
that need to be replicated. The list is divided into a set of queues organized by
region server and the peer cluster the region server is shipping the HLogs to. The
<b>rs znode</b> has one child znode for each region server in the cluster. The child
znode name is simply the regionserver name (a concatenation of the region servers
hostname, client port and start code). These region servers could either be dead or alive.
</p>
<pre>
/hbase/replication/rs
/hostname.example.org,6020,1234
/hostname2.example.org,6020,2856
</pre>
<p>
Within each region server znode, the region server maintains a set of HLog replication
queues. Each region server has one queue for every peer cluster it replicates to.
These queues are represented by child znodes named using the cluster id of the peer
cluster they represent (see the peer znode section).
</p>
<pre>
/hbase/replication/rs
/hostname.example.org,6020,1234
/1
/2
</pre>
<p>
Each queue has one child znode for every HLog that still needs to be replicated.
The value of these HLog child znodes is the latest position that has been replicated.
This position is updated every time a HLog entry is replicated.
</p>
<pre>
/hbase/replication/rs
/hostname.example.org,6020,1234
/1
23522342.23422 [VALUE: 254]
12340993.22342 [VALUE: 0]
</pre>
</section>
</section>
<section name="Configuration Parameters">
<section name="Zookeeper znode paths">
<p>
All of the base znode names are configurable through parameters:
</p>
<table border="1">
<tr>
<td><b>Parameter</b></td>
<td><b>Default Value</b></td>
</tr>
<tr>
<td>zookeeper.znode.parent</td>
<td>/hbase</td>
</tr>
<tr>
<td>zookeeper.znode.replication</td>
<td>replication</td>
</tr>
<tr>
<td>zookeeper.znode.replication.peers</td>
<td>peers</td>
</tr>
<tr>
<td>zookeeper.znode.replication.peers.state</td>
<td>peer-state</td>
</tr>
<tr>
<td>zookeeper.znode.replication.rs</td>
<td>rs</td>
</tr>
</table>
<p>
The default replication znode structure looks like the following:
</p>
<pre>
/hbase/replication/peers/{peerId}/peer-state
/hbase/replication/rs
</pre>
</section>
<section name="Other parameters">
<ul>
<li><b>hbase.replication</b> (Default: false) - Controls whether replication is enabled
or disabled for the cluster.</li>
<li><b>replication.sleep.before.failover</b> (Default: 2000) - The amount of time a failover
worker waits before attempting to replicate a dead region servers HLog queues.</li>
<li><b>replication.executor.workers</b> (Default: 1) - The number of dead region servers
one region server should attempt to failover simultaneously.</li>
</ul>
</section>
</section>
<section name="Choosing region servers to replicate to">
<p>
When a master cluster RS initiates a replication source to a slave cluster,
it first connects to the slave's ZooKeeper ensemble using the provided
cluster key (that key is composed of the value of hbase.zookeeper.quorum,
zookeeper.znode.parent and hbase.zookeeper.property.clientPort). It
then scans the "rs" directory to discover all the available sinks
(region servers that are accepting incoming streams of edits to replicate)
and will randomly choose a subset of them using a configured
ratio (which has a default value of 10%). For example, if a slave
cluster has 150 machines, 15 will be chosen as potential recipient for
edits that this master cluster RS will be sending. Since this is done by all
master cluster RSs, the probability that all slave RSs are used is very high,
and this method works for clusters of any size. For example, a master cluster
of 10 machines replicating to a slave cluster of 5 machines with a ratio
of 10% means that the master cluster RSs will choose one machine each
at random, thus the chance of overlapping and full usage of the slave
cluster is higher.
</p>
<p>
A ZK watcher is placed on the ${zookeeper.znode.parent}/rs node of
the slave cluster by each of the master cluster's region servers.
This watch is used to monitor changes in the composition of the
slave cluster. When nodes are removed from the slave cluster (or
if nodes go down and/or come back up), the master cluster's region
servers will respond by selecting a new pool of slave region servers
to replicate to.
</p>
</section>
<section name="Keeping track of logs">
<p>
Every master cluster RS has its own znode in the replication znodes hierarchy.
It contains one znode per peer cluster (if 5 slave clusters, 5 znodes
are created), and each of these contain a queue
of HLogs to process. Each of these queues will track the HLogs created
by that RS, but they can differ in size. For example, if one slave
cluster becomes unavailable for some time then the HLogs should not be deleted,
thus they need to stay in the queue (while the others are processed).
See the section named "Region server failover" for an example.
</p>
<p>
When a source is instantiated, it contains the current HLog that the
region server is writing to. During log rolling, the new file is added
to the queue of each slave cluster's znode just before it's made available.
This ensures that all the sources are aware that a new log exists
before HLog is able to append edits into it, but this operations is
now more expensive.
The queue items are discarded when the replication thread cannot read
more entries from a file (because it reached the end of the last block)
and that there are other files in the queue.
This means that if a source is up-to-date and replicates from the log
that the region server writes to, reading up to the "end" of the
current file won't delete the item in the queue.
</p>
<p>
When a log is archived (because it's not used anymore or because there's
too many of them per hbase.regionserver.maxlogs typically because insertion
rate is faster than region flushing), it will notify the source threads that the path
for that log changed. If the a particular source was already done with
it, it will just ignore the message. If it's in the queue, the path
will be updated in memory. If the log is currently being replicated,
the change will be done atomically so that the reader doesn't try to
open the file when it's already moved. Also, moving a file is a NameNode
operation so, if the reader is currently reading the log, it won't
generate any exception.
</p>
</section>
<section name="Reading, filtering and sending edits">
<p>
By default, a source will try to read from a log file and ship log
entries as fast as possible to a sink. This is first limited by the
filtering of log entries; only KeyValues that are scoped GLOBAL and
that don't belong to catalog tables will be retained. A second limit
is imposed on the total size of the list of edits to replicate per slave,
which by default is 64MB. This means that a master cluster RS with 3 slaves
will use at most 192MB to store data to replicate. This doesn't account
the data filtered that wasn't garbage collected.
</p>
<p>
Once the maximum size of edits was buffered or the reader hits the end
of the log file, the source thread will stop reading and will choose
at random a sink to replicate to (from the list that was generated by
keeping only a subset of slave RSs). It will directly issue a RPC to
the chosen machine and will wait for the method to return. If it's
successful, the source will determine if the current file is emptied
or if it should continue to read from it. If the former, it will delete
the znode in the queue. If the latter, it will register the new offset
in the log's znode. If the RPC threw an exception, the source will retry
10 times until trying to find a different sink.
</p>
</section>
<section name="Cleaning logs">
<p>
If replication isn't enabled, the master's logs cleaning thread will
delete old logs using a configured TTL. This doesn't work well with
replication since archived logs passed their TTL may still be in a
queue. Thus, the default behavior is augmented so that if a log is
passed its TTL, the cleaning thread will lookup every queue until it
finds the log (while caching the ones it finds). If it's not found,
the log will be deleted. The next time it has to look for a log,
it will first use its cache.
</p>
</section>
<section name="Region server failover">
<p>
As long as region servers don't fail, keeping track of the logs in ZK
doesn't add any value. Unfortunately, they do fail, so since ZooKeeper
is highly available we can count on it and its semantics to help us
managing the transfer of the queues.
</p>
<p>
All the master cluster RSs keep a watcher on every other one of them to be
notified when one dies (just like the master does). When it happens,
they all race to create a znode called "lock" inside the dead RS' znode
that contains its queues. The one that creates it successfully will
proceed by transferring all the queues to its own znode (one by one
since ZK doesn't support the rename operation) and will delete all the
old ones when it's done. The recovered queues' znodes will be named
with the id of the slave cluster appended with the name of the dead
server.
</p>
<p>
Once that is done, the master cluster RS will create one new source thread per
copied queue, and each of them will follow the read/filter/ship pattern.
The main difference is that those queues will never have new data since
they don't belong to their new region server, which means that when
the reader hits the end of the last log, the queue's znode will be
deleted and the master cluster RS will close that replication source.
</p>
<p>
For example, consider a master cluster with 3 region servers that's
replicating to a single slave with id '2'. The following hierarchy
represents what the znodes layout could be at some point in time. We
can see the RSs' znodes all contain a "peers" znode that contains a
single queue. The znode names in the queues represent the actual file
names on HDFS in the form "address,port.timestamp".
</p>
<pre>
/hbase/replication/rs/
1.1.1.1,60020,123456780/
2/
1.1.1.1,60020.1234 (Contains a position)
1.1.1.1,60020.1265
1.1.1.2,60020,123456790/
2/
1.1.1.2,60020.1214 (Contains a position)
1.1.1.2,60020.1248
1.1.1.2,60020.1312
1.1.1.3,60020, 123456630/
2/
1.1.1.3,60020.1280 (Contains a position)
</pre>
<p>
Now let's say that 1.1.1.2 loses its ZK session. The survivors will race
to create a lock, and for some reasons 1.1.1.3 wins. It will then start
transferring all the queues to its local peers znode by appending the
name of the dead server. Right before 1.1.1.3 is able to clean up the
old znodes, the layout will look like the following:
</p>
<pre>
/hbase/replication/rs/
1.1.1.1,60020,123456780/
2/
1.1.1.1,60020.1234 (Contains a position)
1.1.1.1,60020.1265
1.1.1.2,60020,123456790/
lock
2/
1.1.1.2,60020.1214 (Contains a position)
1.1.1.2,60020.1248
1.1.1.2,60020.1312
1.1.1.3,60020,123456630/
2/
1.1.1.3,60020.1280 (Contains a position)
2-1.1.1.2,60020,123456790/
1.1.1.2,60020.1214 (Contains a position)
1.1.1.2,60020.1248
1.1.1.2,60020.1312
</pre>
<p>
Some time later, but before 1.1.1.3 is able to finish replicating the
last HLog from 1.1.1.2, let's say that it dies too (also some new logs
were created in the normal queues). The last RS will then try to lock
1.1.1.3's znode and will begin transferring all the queues. The new
layout will be:
</p>
<pre>
/hbase/replication/rs/
1.1.1.1,60020,123456780/
2/
1.1.1.1,60020.1378 (Contains a position)
2-1.1.1.3,60020,123456630/
1.1.1.3,60020.1325 (Contains a position)
1.1.1.3,60020.1401
2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/
1.1.1.2,60020.1312 (Contains a position)
1.1.1.3,60020,123456630/
lock
2/
1.1.1.3,60020.1325 (Contains a position)
1.1.1.3,60020.1401
2-1.1.1.2,60020,123456790/
1.1.1.2,60020.1312 (Contains a position)
</pre>
</section>
</section>
<section name="Replication Metrics">
Following the some useful metrics which can be used to check the replication progress:
<ul>
<li><b>source.sizeOfLogQueue:</b> number of HLogs to process (excludes the one which is being
processed) at the Replication source</li>
<li><b>source.shippedOps:</b> number of mutations shipped</li>
<li><b>source.logEditsRead:</b> number of mutations read from HLogs at the replication source</li>
<li><b>source.ageOfLastShippedOp:</b> age of last batch that was shipped by the replication source</li>
</ul>
Please note that the above metrics are at the global level at this regionserver. In 0.95.0 and onwards, these
metrics are also exposed per peer level.
</section>
<section name="FAQ">
<section name="GLOBAL means replicate? Any provision to replicate only to cluster X and not to cluster Y? or is that for later?">
<p>
Yes, this is for much later.
</p>
</section>
<section name="You need a bulk edit shipper? Something that allows you transfer 64MB of edits in one go?">
<p>
You can use the HBase-provided utility called CopyTable from the package
org.apache.hadoop.hbase.mapreduce in order to have a discp-like tool to
bulk copy data.
</p>
</section>
<section name="Is it a mistake that WALEdit doesn't carry Put and Delete objects, that we have to reinstantiate not only when replicating but when replaying edits also?">
<p>
Yes, this behavior would help a lot but it's not currently available
in HBase (BatchUpdate had that, but it was lost in the new API).
</p>
</section>
<section name="Is there an issue replicating on Hadoop 1.0/1.1 when short-circuit reads are enabled?">
<p>
Yes. See <a href="https://issues.apache.org/jira/browse/HDFS-2757">HDFS-2757</a>.
</p>
</section>
</section>
<p>This information has been moved to <a href="http://hbase.apache.org/book.html#cluster_replication">the Cluster Replication</a> section of the <a href="http://hbase.apache.org/book.html">Apache HBase Reference Guide</a>.</p>
</body>
</document>