HBASE-1052 Stopping a HRegionServer with unflushed cache causes data loss from org.apache.hadoop.hbase.DroppedSnapshotException
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/branches/0.18@725825 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6e6c79d93e
commit
d7540b9af4
|
@ -6,6 +6,8 @@ Release 0.18.2 - Unreleased
|
||||||
HBASE-602 HBase Crash when network card has a IPv6 address
|
HBASE-602 HBase Crash when network card has a IPv6 address
|
||||||
HBASE-927 We don't recover if HRS hosting -ROOT-/.META. goes down -
|
HBASE-927 We don't recover if HRS hosting -ROOT-/.META. goes down -
|
||||||
(back port from trunk)
|
(back port from trunk)
|
||||||
|
HBASE-1052 Stopping a HRegionServer with unflushed cache causes data loss
|
||||||
|
from org.apache.hadoop.hbase.DroppedSnapshotException
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1046 Narrow getClosestRowBefore by passing column family (backport)
|
HBASE-1046 Narrow getClosestRowBefore by passing column family (backport)
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.Thread.UncaughtExceptionHandler;
|
import java.lang.Thread.UncaughtExceptionHandler;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -172,25 +173,34 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
*/
|
*/
|
||||||
class ShutdownThread extends Thread {
|
class ShutdownThread extends Thread {
|
||||||
private final HRegionServer instance;
|
private final HRegionServer instance;
|
||||||
|
private final Thread mainThread;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param instance
|
* @param instance
|
||||||
|
* @param mainThread
|
||||||
*/
|
*/
|
||||||
public ShutdownThread(HRegionServer instance) {
|
public ShutdownThread(HRegionServer instance, Thread mainThread) {
|
||||||
this.instance = instance;
|
this.instance = instance;
|
||||||
|
this.mainThread = mainThread;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Starting shutdown thread.");
|
LOG.info("Starting shutdown thread.");
|
||||||
|
|
||||||
// tell the region server to stop and wait for it to complete
|
// tell the region server to stop
|
||||||
instance.stop();
|
instance.stop();
|
||||||
instance.join();
|
|
||||||
|
// Wait for main thread to exit.
|
||||||
|
instance.join(mainThread);
|
||||||
|
|
||||||
LOG.info("Shutdown thread complete");
|
LOG.info("Shutdown thread complete");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We need to call HDFS shutdown when we are done shutting down
|
||||||
|
private Thread hdfsShutdownThread;
|
||||||
|
|
||||||
// Compactions
|
// Compactions
|
||||||
final CompactSplitThread compactSplitThread;
|
final CompactSplitThread compactSplitThread;
|
||||||
|
|
||||||
|
@ -270,10 +280,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
for(int i = 0; i < nbBlocks; i++) {
|
for(int i = 0; i < nbBlocks; i++) {
|
||||||
reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
|
reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register shutdown hook for HRegionServer, runs an orderly shutdown
|
|
||||||
// when a kill signal is recieved
|
|
||||||
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -484,6 +490,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
this.hbaseMaster = null;
|
this.hbaseMaster = null;
|
||||||
}
|
}
|
||||||
join();
|
join();
|
||||||
|
|
||||||
|
LOG.info("Running hdfs shutdown thread");
|
||||||
|
hdfsShutdownThread.start();
|
||||||
|
try {
|
||||||
|
hdfsShutdownThread.join();
|
||||||
|
LOG.info("Hdfs shutdown thread completed.");
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("hdfsShutdownThread.join() was interrupted", e);
|
||||||
|
}
|
||||||
LOG.info(Thread.currentThread().getName() + " exiting");
|
LOG.info(Thread.currentThread().getName() + " exiting");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -514,6 +529,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
// to defaults).
|
// to defaults).
|
||||||
this.conf.set("fs.default.name", this.conf.get("hbase.rootdir"));
|
this.conf.set("fs.default.name", this.conf.get("hbase.rootdir"));
|
||||||
this.fs = FileSystem.get(this.conf);
|
this.fs = FileSystem.get(this.conf);
|
||||||
|
|
||||||
|
// Register shutdown hook for HRegionServer, runs an orderly shutdown
|
||||||
|
// when a kill signal is recieved
|
||||||
|
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this,
|
||||||
|
Thread.currentThread()));
|
||||||
|
this.hdfsShutdownThread = suppressHdfsShutdownHook();
|
||||||
|
|
||||||
this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
|
this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
|
||||||
this.log = setupHLog();
|
this.log = setupHLog();
|
||||||
startServiceThreads();
|
startServiceThreads();
|
||||||
|
@ -529,6 +551,43 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* So, HDFS caches FileSystems so when you call FileSystem.get it's fast. In
|
||||||
|
* order to make sure things are cleaned up, it also creates a shutdown hook
|
||||||
|
* so that all filesystems can be closed when the process is terminated. This
|
||||||
|
* conveniently runs concurrently with our own shutdown handler, and
|
||||||
|
* therefore causes all the filesystems to be closed before the server can do
|
||||||
|
* all its necessary cleanup.
|
||||||
|
*
|
||||||
|
* The crazy dirty reflection in this method sneaks into the FileSystem cache
|
||||||
|
* and grabs the shutdown hook, removes it from the list of active shutdown
|
||||||
|
* hooks, and hangs onto it until later. Then, after we're properly done with
|
||||||
|
* our graceful shutdown, we can execute the hdfs hook manually to make sure
|
||||||
|
* loose ends are tied up.
|
||||||
|
*
|
||||||
|
* This seems quite fragile and susceptible to breaking if Hadoop changes
|
||||||
|
* anything about the way this cleanup is managed. Keep an eye on things.
|
||||||
|
*/
|
||||||
|
private Thread suppressHdfsShutdownHook() {
|
||||||
|
try {
|
||||||
|
Field field = FileSystem.class.getDeclaredField ("clientFinalizer");
|
||||||
|
field.setAccessible(true);
|
||||||
|
Thread hdfsClientFinalizer = (Thread)field.get(null);
|
||||||
|
if (hdfsClientFinalizer == null) {
|
||||||
|
throw new RuntimeException("client finalizer is null, can't suppress!");
|
||||||
|
}
|
||||||
|
Runtime.getRuntime().removeShutdownHook(hdfsClientFinalizer);
|
||||||
|
return hdfsClientFinalizer;
|
||||||
|
|
||||||
|
} catch (NoSuchFieldException nsfe) {
|
||||||
|
LOG.fatal("Couldn't find field 'clientFinalizer' in FileSystem!", nsfe);
|
||||||
|
throw new RuntimeException("Failed to suppress HDFS shutdown hook");
|
||||||
|
} catch (IllegalAccessException iae) {
|
||||||
|
LOG.fatal("Couldn't access field 'clientFinalizer' in FileSystem!", iae);
|
||||||
|
throw new RuntimeException("Failed to suppress HDFS shutdown hook");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report the status of the server. A server is online once all the startup
|
* Report the status of the server. A server is online once all the startup
|
||||||
* is completed (setting up filesystem, starting service threads, etc.). This
|
* is completed (setting up filesystem, starting service threads, etc.). This
|
||||||
|
@ -677,7 +736,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
join(this.logRoller);
|
join(this.logRoller);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void join(final Thread t) {
|
void join(final Thread t) {
|
||||||
while (t.isAlive()) {
|
while (t.isAlive()) {
|
||||||
try {
|
try {
|
||||||
t.join();
|
t.join();
|
||||||
|
|
Loading…
Reference in New Issue