From d7540b9af40229f3b23f2842974345efaeda821e Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Thu, 11 Dec 2008 21:40:02 +0000 Subject: [PATCH] HBASE-1052 Stopping a HRegionServer with unflushed cache causes data loss from org.apache.hadoop.hbase.DroppedSnapshotException git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/branches/0.18@725825 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../hbase/regionserver/HRegionServer.java | 75 +++++++++++++++++-- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0cf60d3da11..c3e7b40d66a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Release 0.18.2 - Unreleased HBASE-602 HBase Crash when network card has a IPv6 address HBASE-927 We don't recover if HRS hosting -ROOT-/.META. goes down - (back port from trunk) + HBASE-1052 Stopping a HRegionServer with unflushed cache causes data loss + from org.apache.hadoop.hbase.DroppedSnapshotException IMPROVEMENTS HBASE-1046 Narrow getClosestRowBefore by passing column family (backport) diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bc377ea1584..be27aab8838 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.reflect.Constructor; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; @@ -172,25 +173,34 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ class ShutdownThread extends Thread { private final HRegionServer instance; + private final Thread mainThread; /** * @param instance + * @param mainThread */ - public ShutdownThread(HRegionServer instance) { + public ShutdownThread(HRegionServer instance, Thread mainThread) { this.instance = instance; + this.mainThread = mainThread; } @Override public void run() { LOG.info("Starting shutdown thread."); - // tell the region server to stop and wait for it to complete + // tell the region server to stop instance.stop(); - instance.join(); + + // Wait for main thread to exit. + instance.join(mainThread); + LOG.info("Shutdown thread complete"); } } + // We need to call HDFS shutdown when we are done shutting down + private Thread hdfsShutdownThread; + // Compactions final CompactSplitThread compactSplitThread; @@ -270,10 +280,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { for(int i = 0; i < nbBlocks; i++) { reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]); } - - // Register shutdown hook for HRegionServer, runs an orderly shutdown - // when a kill signal is recieved - Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); } /** @@ -484,6 +490,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { this.hbaseMaster = null; } join(); + + LOG.info("Running hdfs shutdown thread"); + hdfsShutdownThread.start(); + try { + hdfsShutdownThread.join(); + LOG.info("Hdfs shutdown thread completed."); + } catch (InterruptedException e) { + LOG.warn("hdfsShutdownThread.join() was interrupted", e); + } LOG.info(Thread.currentThread().getName() + " exiting"); } @@ -514,6 +529,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // to defaults). this.conf.set("fs.default.name", this.conf.get("hbase.rootdir")); this.fs = FileSystem.get(this.conf); + + // Register shutdown hook for HRegionServer, runs an orderly shutdown + // when a kill signal is recieved + Runtime.getRuntime().addShutdownHook(new ShutdownThread(this, + Thread.currentThread())); + this.hdfsShutdownThread = suppressHdfsShutdownHook(); + this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR)); this.log = setupHLog(); startServiceThreads(); @@ -529,6 +551,43 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } } + /** + * So, HDFS caches FileSystems so when you call FileSystem.get it's fast. In + * order to make sure things are cleaned up, it also creates a shutdown hook + * so that all filesystems can be closed when the process is terminated. This + * conveniently runs concurrently with our own shutdown handler, and + * therefore causes all the filesystems to be closed before the server can do + * all its necessary cleanup. + * + * The crazy dirty reflection in this method sneaks into the FileSystem cache + * and grabs the shutdown hook, removes it from the list of active shutdown + * hooks, and hangs onto it until later. Then, after we're properly done with + * our graceful shutdown, we can execute the hdfs hook manually to make sure + * loose ends are tied up. + * + * This seems quite fragile and susceptible to breaking if Hadoop changes + * anything about the way this cleanup is managed. Keep an eye on things. + */ + private Thread suppressHdfsShutdownHook() { + try { + Field field = FileSystem.class.getDeclaredField ("clientFinalizer"); + field.setAccessible(true); + Thread hdfsClientFinalizer = (Thread)field.get(null); + if (hdfsClientFinalizer == null) { + throw new RuntimeException("client finalizer is null, can't suppress!"); + } + Runtime.getRuntime().removeShutdownHook(hdfsClientFinalizer); + return hdfsClientFinalizer; + + } catch (NoSuchFieldException nsfe) { + LOG.fatal("Couldn't find field 'clientFinalizer' in FileSystem!", nsfe); + throw new RuntimeException("Failed to suppress HDFS shutdown hook"); + } catch (IllegalAccessException iae) { + LOG.fatal("Couldn't access field 'clientFinalizer' in FileSystem!", iae); + throw new RuntimeException("Failed to suppress HDFS shutdown hook"); + } + } + /** * Report the status of the server. A server is online once all the startup * is completed (setting up filesystem, starting service threads, etc.). This @@ -677,7 +736,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { join(this.logRoller); } - private void join(final Thread t) { + void join(final Thread t) { while (t.isAlive()) { try { t.join();