HBASE-11280 Document distributed log replay and distributed log splitting (Misty Stanley-Jones)

This commit is contained in:
Michael Stack 2014-06-17 18:18:00 -05:00
parent 1324a3cb18
commit 1b92422597
1 changed files with 350 additions and 32 deletions

View File

@ -2200,16 +2200,21 @@ rs.close();
<section
xml:id="purpose.wal">
<title>Purpose</title>
<para>Each RegionServer adds updates (Puts, Deletes) to its write-ahead log (WAL) first,
and then to the <xref
linkend="store.memstore" /> for the affected <xref
linkend="store" />. This ensures that HBase has durable writes. Without WAL, there is
the possibility of data loss in the case of a RegionServer failure before each MemStore
is flushed and new StoreFiles are written. <link
<para>The <firstterm>Write Ahead Log (WAL)</firstterm> records all changes to data in
HBase, to file-based storage. Under normal operations, the WAL is not needed because
data changes move from the MemStore to StoreFiles. However, if a RegionServer crashes or
becomes unavailable before the MemStore is flushed, the WAL ensures that the changes to
the data can be replayed. If writing to the WAL fails, the entire operation to modify the
data fails.</para>
<para>HBase uses an implementation of the <link
xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/wal/HLog.html">HLog</link>
is the HBase WAL implementation, and there is one HLog instance per RegionServer. </para>
<para>The WAL is in HDFS in <filename>/hbase/.logs/</filename> with subdirectories per
interface for the WAL.
Usually, there is only one instance of a WAL per RegionServer. The RegionServer records Puts and Deletes to
it, before recording them to the <xref
linkend="store.memstore" /> for the affected <xref
linkend="store" />.</para>
<para>The WAL resides in HDFS in the <filename>/hbase/WALs/</filename> directory (prior to
HBase 0.94, they were stored in <filename>/hbase/.logs/</filename>), with subdirectories per
region.</para>
<para> For more general information about the concept of write ahead logs, see the
Wikipedia <link
@ -2226,39 +2231,352 @@ rs.close();
xml:id="wal_splitting">
<title>WAL Splitting</title>
<section>
<title>How edits are recovered from a crashed RegionServer</title>
<para>When a RegionServer crashes, it will lose its ephemeral lease in
ZooKeeper...TODO</para>
</section>
<section>
<title><varname>hbase.hlog.split.skip.errors</varname></title>
<para>A RegionServer serves many regions. All of the regions in a region server share the
same active WAL file. Each edit in the WAL file includes information about which region
it belongs to. When a region is opened, the edits in the WAL file which belong to that
region need to be replayed. Therefore, edits in the WAL file must be grouped by region
so that particular sets can be replayed to regenerate the data in a particular region.
The process of grouping the WAL edits by region is called <firstterm>log
splitting</firstterm>. It is a critical process for recovering data if a region server
fails.</para>
<para>Log splitting is done by the HMaster during cluster start-up or by the ServerShutdownHandler
as a region server shuts down. So that consistency is guaranteed, affected regions
are unavailable until data is restored. All WAL edits need to be recovered and replayed
before a given region can become available again. As a result, regions affected by
log splitting are unavailable until the process completes.</para>
<procedure xml:id="log.splitting.step.by.step">
<title>Log Splitting, Step by Step</title>
<step>
<title>The <filename>/hbase/WALs/&lt;host>,&lt;port>,&lt;startcode></filename> directory is renamed.</title>
<para>Renaming the directory is important because a RegionServer may still be up and
accepting requests even if the HMaster thinks it is down. If the RegionServer does
not respond immediately and does not heartbeat its ZooKeeper session, the HMaster
may interpret this as a RegionServer failure. Renaming the logs directory ensures
that existing, valid WAL files which are still in use by an active but busy
RegionServer are not written to by accident.</para>
<para>The new directory is named according to the following pattern:</para>
<screen><![CDATA[/hbase/WALs/<host>,<port>,<startcode>-splitting]]></screen>
<para>An example of such a renamed directory might look like the following:</para>
<screen>/hbase/WALs/srv.example.com,60020,1254173957298-splitting</screen>
</step>
<step>
<title>Each log file is split, one at a time.</title>
<para>The log splitter reads the log file one edit entry at a time and puts each edit
entry into the buffer corresponding to the edits region. At the same time, the
splitter starts several writer threads. Writer threads pick up a corresponding
buffer and write the edit entries in the buffer to a temporary recovered edit
file. The temporary edit file is stored to disk with the following naming pattern:</para>
<screen><![CDATA[/hbase/<table_name>/<region_id>/recovered.edits/.temp]]></screen>
<para>This file is used to store all the edits in the WAL log for this region. After
log splitting completes, the <filename>.temp</filename> file is renamed to the
sequence ID of the first log written to the file.</para>
<para>To determine whether all edits have been written, the sequence ID is compared to
the sequence of the last edit that was written to the HFile. If the sequence of the
last edit is greater than or equal to the sequence ID included in the file name, it
is clear that all writes from the edit file have been completed.</para>
</step>
<step>
<title>After log splitting is complete, each affected region is assigned to a
RegionServer.</title>
<para> When the region is opened, the <filename>recovered.edits</filename> folder is checked for recovered
edits files. If any such files are present, they are replayed by reading the edits
and saving them to the MemStore. After all edit files are replayed, the contents of
the MemStore are written to disk (HFile) and the edit files are deleted.</para>
</step>
</procedure>
<para>When set to <constant>true</constant>, any error encountered splitting will be
logged, the problematic WAL will be moved into the <filename>.corrupt</filename>
directory under the hbase <varname>rootdir</varname>, and processing will continue. If
set to <constant>false</constant>, the default, the exception will be propagated and
the split logged as failed.<footnote>
<section>
<title>Handling of Errors During Log Splitting</title>
<para>If you set the <varname>hbase.hlog.split.skip.errors</varname> option to
<constant>true</constant>, errors are treated as follows:</para>
<itemizedlist>
<listitem>
<para>Any error encountered during splitting will be logged.</para>
</listitem>
<listitem>
<para>The problematic WAL log will be moved into the <filename>.corrupt</filename>
directory under the hbase <varname>rootdir</varname>,</para>
</listitem>
<listitem>
<para>Processing of the WAL will continue</para>
</listitem>
</itemizedlist>
<para>If the <varname>hbase.hlog.split.skip.errors</varname> optionset to
<literal>false</literal>, the default, the exception will be propagated and the
split will be logged as failed.<footnote>
<para>See <link
xlink:href="https://issues.apache.org/jira/browse/HBASE-2958">HBASE-2958 When
hbase.hlog.split.skip.errors is set to false, we fail the split but thats
it</link>. We need to do more than just fail split if this flag is set.</para>
</footnote></para>
<section>
<title>How EOFExceptions are treated when splitting a crashed RegionServers'
WALs</title>
<para>If an EOFException occurs while splitting logs, the split proceeds even when
<varname>hbase.hlog.split.skip.errors</varname> is set to
<literal>false</literal>. An EOFException while reading the last log in the set of
files to split is likely, because the RegionServer is likely to be in the process of
writing a record at the time of a crash. <footnote>
<para>For background, see <link
xlink:href="https://issues.apache.org/jira/browse/HBASE-2643">HBASE-2643
Figure how to deal with eof splitting logs</link></para>
</footnote></para>
</section>
</section>
<section>
<title>How EOFExceptions are treated when splitting a crashed RegionServers'
WALs</title>
<title>Performance Improvements during Log Splitting</title>
<para>
WAL log splitting and recovery can be resource intensive and take a long time,
depending on the number of RegionServers involved in the crash and the size of the
regions. <xref linkend="distributed.log.splitting" /> and <xref
linkend="distributed.log.replay" /> were developed to improve
performance during log splitting.
</para>
<section xml:id="distributed.log.splitting">
<title>Distributed Log Splitting</title>
<para><firstterm>Distributed Log Splitting</firstterm> was added in HBase version 0.92
(<link xlink:href="https://issues.apache.org/jira/browse/HBASE-1364">HBASE-1364</link>)
by Prakash Khemani from Facebook. It reduces the time to complete log splitting
dramatically, improving the availability of regions and tables. For
example, recovering a crashed cluster took around 9 hours with single-threaded log
splitting, but only about six minutes with distributed log splitting.</para>
<para>The information in this section is sourced from Jimmy Xiang's blog post at <link
xlink:href="http://blog.cloudera.com/blog/2012/07/hbase-log-splitting/" />.</para>
<para>If we get an EOF while splitting logs, we proceed with the split even when
<varname>hbase.hlog.split.skip.errors</varname> == <constant>false</constant>. An
EOF while reading the last log in the set of files to split is near-guaranteed since
the RegionServer likely crashed mid-write of a record. But we'll continue even if we
got an EOF reading other than the last file in the set.<footnote>
<para>For background, see <link
xlink:href="https://issues.apache.org/jira/browse/HBASE-2643">HBASE-2643 Figure
how to deal with eof splitting logs</link></para>
</footnote></para>
<formalpara>
<title>Enabling or Disabling Distributed Log Splitting</title>
<para>Distributed log processing is enabled by default since HBase 0.92. The setting
is controlled by the <property>hbase.master.distributed.log.splitting</property>
property, which can be set to <literal>true</literal> or <literal>false</literal>,
but defaults to <literal>true</literal>. </para>
</formalpara>
<procedure>
<title>Distributed Log Splitting, Step by Step</title>
<para>After configuring distributed log splitting, the HMaster controls the process.
The HMaster enrolls each RegionServer in the log splitting process, and the actual
work of splitting the logs is done by the RegionServers. The general process for
log splitting, as described in <xref
linkend="log.splitting.step.by.step" /> still applies here.</para>
<step>
<para>If distributed log processing is enabled, the HMaster creates a
<firstterm>split log manager</firstterm> instance when the cluster is started.
The split log manager manages all log files which need
to be scanned and split. The split log manager places all the logs into the
ZooKeeper splitlog node (<filename>/hbase/splitlog</filename>) as tasks. You can
view the contents of the splitlog by issuing the following
<command>zkcli</command> command. Example output is shown.</para>
<screen>ls /hbase/splitlog
[hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2F.logs%2Fhost8.sample.com%2C57020%2C1340474893275-splitting%2Fhost8.sample.com%253A57020.1340474893900,
hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2F.logs%2Fhost3.sample.com%2C57020%2C1340474893299-splitting%2Fhost3.sample.com%253A57020.1340474893931,
hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2F.logs%2Fhost4.sample.com%2C57020%2C1340474893287-splitting%2Fhost4.sample.com%253A57020.1340474893946]
</screen>
<para>The output contains some non-ASCII characters. When decoded, it looks much
more simple:</para>
<screen>
[hdfs://host2.sample.com:56020/hbase/.logs
/host8.sample.com,57020,1340474893275-splitting
/host8.sample.com%3A57020.1340474893900,
hdfs://host2.sample.com:56020/hbase/.logs
/host3.sample.com,57020,1340474893299-splitting
/host3.sample.com%3A57020.1340474893931,
hdfs://host2.sample.com:56020/hbase/.logs
/host4.sample.com,57020,1340474893287-splitting
/host4.sample.com%3A57020.1340474893946]
</screen>
<para>The listing represents WAL file names to be scanned and split, which is a
list of log splitting tasks.</para>
</step>
<step>
<title>The split log manager monitors the log-splitting tasks and workers.</title>
<para>The split log manager is responsible for the following ongoing tasks:</para>
<itemizedlist>
<listitem>
<para>Once the split log manager publishes all the tasks to the splitlog
znode, it monitors these task nodes and waits for them to be
processed.</para>
</listitem>
<listitem>
<para>Checks to see if there are any dead split log
workers queued up. If it finds tasks claimed by unresponsive workers, it
will resubmit those tasks. If the resubmit fails due to some ZooKeeper
exception, the dead worker is queued up again for retry.</para>
</listitem>
<listitem>
<para>Checks to see if there are any unassigned
tasks. If it finds any, it create an ephemeral rescan node so that each
split log worker is notified to re-scan unassigned tasks via the
<code>nodeChildrenChanged</code> ZooKeeper event.</para>
</listitem>
<listitem>
<para>Checks for tasks which are assigned but expired. If any are found, they
are moved back to <code>TASK_UNASSIGNED</code> state again so that they can
be retried. It is possible that these tasks are assigned to slow workers, or
they may already be finished. This is not a problem, because log splitting
tasks have the property of idempotence. In other words, the same log
splitting task can be processed many times without causing any
problem.</para>
</listitem>
<listitem>
<para>The split log manager watches the HBase split log znodes constantly. If
any split log task node data is changed, the split log manager retrieves the
node data. The
node data contains the current state of the task. You can use the
<command>zkcli</command> <command>get</command> command to retrieve the
current state of a task. In the example output below, the first line of the
output shows that the task is currently unassigned.</para>
<screen>
<userinput>get /hbase/splitlog/hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2F.logs%2Fhost6.sample.com%2C57020%2C1340474893287-splitting%2Fhost6.sample.com%253A57020.1340474893945
</userinput>
<computeroutput>unassigned host2.sample.com:57000
cZxid = 0×7115
ctime = Sat Jun 23 11:13:40 PDT 2012
...</computeroutput>
</screen>
<para>Based on the state of the task whose data is changed, the split log
manager does one of the following:</para>
<itemizedlist>
<listitem>
<para>Resubmit the task if it is unassigned</para>
</listitem>
<listitem>
<para>Heartbeat the task if it is assigned</para>
</listitem>
<listitem>
<para>Resubmit or fail the task if it is resigned (see <xref
linkend="distributed.log.replay.failure.reasons" />)</para>
</listitem>
<listitem>
<para>Resubmit or fail the task if it is completed with errors (see <xref
linkend="distributed.log.replay.failure.reasons" />)</para>
</listitem>
<listitem>
<para>Resubmit or fail the task if it could not complete due to
errors (see <xref
linkend="distributed.log.replay.failure.reasons" />)</para>
</listitem>
<listitem>
<para>Delete the task if it is successfully completed or failed</para>
</listitem>
</itemizedlist>
<itemizedlist xml:id="distributed.log.replay.failure.reasons">
<title>Reasons a Task Will Fail</title>
<listitem><para>The task has been deleted.</para></listitem>
<listitem><para>The node no longer exists.</para></listitem>
<listitem><para>The log status manager failed to move the state of the task
to TASK_UNASSIGNED.</para></listitem>
<listitem><para>The number of resubmits is over the resubmit
threshold.</para></listitem>
</itemizedlist>
</listitem>
</itemizedlist>
</step>
<step>
<title>Each RegionServer's split log worker performs the log-splitting tasks.</title>
<para>Each RegionServer runs a daemon thread called the <firstterm>split log
worker</firstterm>, which does the work to split the logs. The daemon thread
starts when the RegionServer starts, and registers itself to watch HBase znodes.
If any splitlog znode children change, it notifies a sleeping worker thread to
wake up and grab more tasks. If if a worker's current tasks node data is
changed, the worker checks to see if the task has been taken by another worker.
If so, the worker thread stops work on the current task.</para>
<para>The worker monitors
the splitlog znode constantly. When a new task appears, the split log worker
retrieves the task paths and checks each one until it finds an unclaimed task,
which it attempts to claim. If the claim was successful, it attempts to perform
the task and updates the task's <property>state</property> property based on the
splitting outcome. At this point, the split log worker scans for another
unclaimed task.</para>
<itemizedlist>
<title>How the Split Log Worker Approaches a Task</title>
<listitem>
<para>It queries the task state and only takes action if the task is in
<literal>TASK_UNASSIGNED </literal>state.</para>
</listitem>
<listitem>
<para>If the task is is in <literal>TASK_UNASSIGNED</literal> state, the
worker attempts to set the state to <literal>TASK_OWNED</literal> by itself.
If it fails to set the state, another worker will try to grab it. The split
log manager will also ask all workers to rescan later if the task remains
unassigned.</para>
</listitem>
<listitem>
<para>If the worker succeeds in taking ownership of the task, it tries to get
the task state again to make sure it really gets it asynchronously. In the
meantime, it starts a split task executor to do the actual work: </para>
<itemizedlist>
<listitem>
<para>Get the HBase root folder, create a temp folder under the root, and
split the log file to the temp folder.</para>
</listitem>
<listitem>
<para>If the split was successful, the task executor sets the task to
state <literal>TASK_DONE</literal>.</para>
</listitem>
<listitem>
<para>If the worker catches an unexpected IOException, the task is set to
state <literal>TASK_ERR</literal>.</para>
</listitem>
<listitem>
<para>If the worker is shutting down, set the the task to state
<literal>TASK_RESIGNED</literal>.</para>
</listitem>
<listitem>
<para>If the task is taken by another worker, just log it.</para>
</listitem>
</itemizedlist>
</listitem>
</itemizedlist>
</step>
<step>
<title>The split log manager monitors for uncompleted tasks.</title>
<para>The split log manager returns when all tasks are completed successfully. If
all tasks are completed with some failures, the split log manager throws an
exception so that the log splitting can be retried. Due to an asynchronous
implementation, in very rare cases, the split log manager loses track of some
completed tasks. For that reason, it periodically checks for remaining
uncompleted task in its task map or ZooKeeper. If none are found, it throws an
exception so that the log splitting can be retried right away instead of hanging
there waiting for something that wont happen.</para>
</step>
</procedure>
</section>
<section xml:id="distributed.log.replay">
<title>Distributed Log Replay</title>
<para>After a RegionServer fails, its failed region is assigned to another
RegionServer, which is marked as "recovering" in ZooKeeper. A split log worker directly
replays edits from the WAL of the failed region server to the region at its new
location. When a region is in "recovering" state, it can accept writes but no reads
(including Append and Increment), region splits or merges. </para>
<para>Distributed Log Replay extends the <xref linkend="distributed.log.splitting" /> framework. It works by
directly replaying WAL edits to another RegionServer instead of creating
<filename>recovered.edits</filename> files. It provides the following advantages
over distributed log splitting alone:</para>
<itemizedlist>
<listitem><para>It eliminates the overhead of writing and reading a large number of
<filename>recovered.edits</filename> files. It is not unusual for thousands of
<filename>recovered.edits</filename> files to be created and written concurrently
during a RegionServer recovery. Many small random writes can degrade overall
system performance.</para></listitem>
<listitem><para>It allows writes even when a region is in recovering state. It only takes seconds for a recovering region to accept writes again.
</para></listitem>
</itemizedlist>
<formalpara>
<title>Enabling Distributed Log Replay</title>
<para>To enable distributed log replay, set <varname>hbase.master.distributed.log.replay</varname> to
true. This will be the default for HBase 0.99 (<link
xlink:href="https://issues.apache.org/jira/browse/HBASE-10888">HBASE-10888</link>).</para>
</formalpara>
<para>You must also enable HFile version 3 (which is the default HFile format starting
in HBase 0.99. See <link
xlink:href="https://issues.apache.org/jira/browse/HBASE-10855">HBASE-10855</link>).
Distributed log replay is unsafe for rolling upgrades.</para>
</section>
</section>
</section>
</section>