HBASE-8207 Don't use hdfs append during lease recovery
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1463957 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
92e2089d65
commit
ac97504dc5
|
@ -18,122 +18,84 @@
|
|||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
|
||||
|
||||
/**
|
||||
* Implementation for hdfs
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class FSHDFSUtils extends FSUtils{
|
||||
private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
|
||||
|
||||
/**
|
||||
* Lease timeout constant, sourced from HDFS upstream.
|
||||
* The upstream constant is defined in a private interface, so we
|
||||
* can't reuse for compatibility reasons.
|
||||
* NOTE: On versions earlier than Hadoop 0.23, the constant is in
|
||||
* o.a.h.hdfs.protocol.FSConstants, while for 0.23 and above it is
|
||||
* in o.a.h.hdfs.protocol.HdfsConstants cause of HDFS-1620.
|
||||
* Recover the lease from HDFS, retrying multiple times.
|
||||
*/
|
||||
public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
|
||||
|
||||
public static final String TEST_TRIGGER_DFS_APPEND = "hbase.test.trigger.dfs.append";
|
||||
|
||||
@Override
|
||||
public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
|
||||
throws IOException{
|
||||
if (!isAppendSupported(conf)) {
|
||||
LOG.warn("Running on HDFS without append enabled may result in data loss");
|
||||
return;
|
||||
}
|
||||
throws IOException {
|
||||
// lease recovery not needed for local file system case.
|
||||
// currently, local file system doesn't implement append either.
|
||||
if (!(fs instanceof DistributedFileSystem)) {
|
||||
return;
|
||||
}
|
||||
LOG.info("Recovering file " + p);
|
||||
long startWaiting = System.currentTimeMillis();
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
|
||||
// Trying recovery
|
||||
LOG.info("Recovering file " + p);
|
||||
long startWaiting = EnvironmentEdgeManager.currentTimeMillis();
|
||||
// Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
|
||||
// usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
|
||||
// beyond that limit 'to be safe'.
|
||||
long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
|
||||
boolean recovered = false;
|
||||
long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 300000);
|
||||
// conf parameter passed from unit test, indicating whether fs.append() should be triggered
|
||||
boolean triggerAppend = conf.getBoolean(TEST_TRIGGER_DFS_APPEND, false);
|
||||
Exception ex = null;
|
||||
int nbAttempt = 0;
|
||||
while (!recovered) {
|
||||
nbAttempt++;
|
||||
try {
|
||||
try {
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
if (triggerAppend) throw new IOException();
|
||||
try {
|
||||
recovered = (Boolean) DistributedFileSystem.class.getMethod(
|
||||
"recoverLease", new Class[] { Path.class }).invoke(dfs, p);
|
||||
if (!recovered) LOG.debug("recoverLease returned false");
|
||||
} catch (InvocationTargetException ite) {
|
||||
// function was properly called, but threw it's own exception
|
||||
throw (IOException) ite.getCause();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Failed fs.recoverLease invocation, " + e.toString() +
|
||||
", trying fs.append instead");
|
||||
ex = e;
|
||||
}
|
||||
if (ex != null || System.currentTimeMillis() - startWaiting > recoveryTimeout) {
|
||||
LOG.debug("trying fs.append for " + p + " with " + ex);
|
||||
ex = null; // assume the following append() call would succeed
|
||||
FSDataOutputStream out = fs.append(p);
|
||||
out.close();
|
||||
recovered = true;
|
||||
LOG.debug("fs.append passed");
|
||||
}
|
||||
if (recovered) break;
|
||||
// recoverLease is asynchronous. We expect it to return true at the first call if the
|
||||
// file is closed. So, it returns:
|
||||
// - false when it starts the lease recovery (i.e. lease recovery not *yet* done
|
||||
// - true when the lease recovery has succeeded or the file is closed.
|
||||
recovered = dfs.recoverLease(p);
|
||||
LOG.info("Attempt " + nbAttempt + " to recoverLease on file " + p +
|
||||
" returned " + recovered + ", trying for " +
|
||||
(EnvironmentEdgeManager.currentTimeMillis() - startWaiting) + "ms");
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
if (e instanceof AlreadyBeingCreatedException) {
|
||||
// We expect that we'll get this message while the lease is still
|
||||
// within its soft limit, but if we get it past that, it means
|
||||
// that the RS is holding onto the file even though it lost its
|
||||
// znode. We could potentially abort after some time here.
|
||||
long waitedFor = System.currentTimeMillis() - startWaiting;
|
||||
if (waitedFor > LEASE_SOFTLIMIT_PERIOD) {
|
||||
LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p +
|
||||
":" + e.getMessage());
|
||||
}
|
||||
} else if (e instanceof LeaseExpiredException &&
|
||||
e.getMessage().contains("File does not exist")) {
|
||||
if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
|
||||
// This exception comes out instead of FNFE, fix it
|
||||
throw new FileNotFoundException(
|
||||
"The given HLog wasn't found at " + p.toString());
|
||||
} else {
|
||||
throw new IOException("Failed to open " + p + " for append", e);
|
||||
throw new FileNotFoundException("The given HLog wasn't found at " + p);
|
||||
}
|
||||
LOG.warn("Got IOException on attempt " + nbAttempt + " to recover lease for file " + p +
|
||||
", retrying.", e);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
if (!recovered) {
|
||||
// try at least twice.
|
||||
if (nbAttempt > 2 && recoveryTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
|
||||
LOG.error("Can't recoverLease after " + nbAttempt + " attempts and " +
|
||||
(EnvironmentEdgeManager.currentTimeMillis() - startWaiting) + "ms " + " for " + p +
|
||||
" - continuing without the lease, but we could have a data loss.");
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(nbAttempt < 3 ? 500 : 1000);
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.info("Finished lease recover attempt for " + p);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,9 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
|
@ -196,7 +194,7 @@ public class TestHLog {
|
|||
}
|
||||
log.close();
|
||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||
hbaseDir, logdir, this.oldLogDir, this.fs);
|
||||
hbaseDir, logdir, oldLogDir, fs);
|
||||
List<Path> splits =
|
||||
logSplitter.splitLog();
|
||||
verifySplits(splits, howmany);
|
||||
|
@ -382,19 +380,11 @@ public class TestHLog {
|
|||
*/
|
||||
@Test
|
||||
public void testAppendClose() throws Exception {
|
||||
testAppendClose(true);
|
||||
testAppendClose(false);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param triggerDirectAppend whether to trigger direct call of fs.append()
|
||||
*/
|
||||
public void testAppendClose(final boolean triggerDirectAppend) throws Exception {
|
||||
byte [] tableName = Bytes.toBytes(getName());
|
||||
HRegionInfo regioninfo = new HRegionInfo(tableName,
|
||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
|
||||
|
||||
HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir" + triggerDirectAppend,
|
||||
HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir",
|
||||
"hlogdir_archive", conf);
|
||||
final int total = 20;
|
||||
|
||||
|
@ -410,7 +400,7 @@ public class TestHLog {
|
|||
wal.sync();
|
||||
int namenodePort = cluster.getNameNodePort();
|
||||
final Path walPath = ((FSHLog) wal).computeFilename();
|
||||
|
||||
|
||||
|
||||
// Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
|
||||
try {
|
||||
|
@ -453,23 +443,21 @@ public class TestHLog {
|
|||
Method setLeasePeriod = cluster.getClass()
|
||||
.getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
|
||||
setLeasePeriod.setAccessible(true);
|
||||
setLeasePeriod.invoke(cluster,
|
||||
new Object[]{new Long(1000), new Long(1000)});
|
||||
setLeasePeriod.invoke(cluster, 1000L, 1000L);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info(e);
|
||||
}
|
||||
|
||||
|
||||
// Now try recovering the log, like the HMaster would do
|
||||
final FileSystem recoveredFs = fs;
|
||||
final Configuration rlConf = conf;
|
||||
|
||||
|
||||
class RecoverLogThread extends Thread {
|
||||
public Exception exception = null;
|
||||
public void run() {
|
||||
try {
|
||||
rlConf.setBoolean(FSHDFSUtils.TEST_TRIGGER_DFS_APPEND, triggerDirectAppend);
|
||||
FSUtils.getInstance(fs, rlConf)
|
||||
.recoverFileLease(recoveredFs, walPath, rlConf);
|
||||
} catch (IOException e) {
|
||||
|
|
Loading…
Reference in New Issue