HBASE-4169 FSUtils LeaseRecovery for non HDFS FileSystems
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1155159 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
989390de47
commit
a2d7e57e71
|
@ -363,6 +363,7 @@ Release 0.91.0 - Unreleased
|
||||||
HBASE-4158 Upgrade pom.xml to surefire 2.9 (Aaron Kushner & Mikhail)
|
HBASE-4158 Upgrade pom.xml to surefire 2.9 (Aaron Kushner & Mikhail)
|
||||||
HBASE-3899 Add ability for delayed RPC calls to set return value
|
HBASE-3899 Add ability for delayed RPC calls to set return value
|
||||||
immediately at call return. (Vlad Dogaru via todd)
|
immediately at call return. (Vlad Dogaru via todd)
|
||||||
|
HBASE-4169 FSUtils LeaseRecovery for non HDFS FileSystems (Lohit Vijayarenu)
|
||||||
|
|
||||||
TASKS
|
TASKS
|
||||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||||
|
|
|
@ -19,8 +19,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
|
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
@ -57,6 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
|
@ -688,7 +687,7 @@ public class HLogSplitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
recoverFileLease(fs, path, conf);
|
FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf);
|
||||||
try {
|
try {
|
||||||
in = getReader(fs, path, conf);
|
in = getReader(fs, path, conf);
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation for hdfs
|
||||||
|
*/
|
||||||
|
public class FSHDFSUtils extends FSUtils{
|
||||||
|
private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Recover file lease. Used when a file might be suspect to be had been left open
|
||||||
|
* by another process. <code>p</code>
|
||||||
|
* @param fs
|
||||||
|
* @param p
|
||||||
|
* @param append True if append supported
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
|
||||||
|
throws IOException{
|
||||||
|
if (!isAppendSupported(conf)) {
|
||||||
|
LOG.warn("Running on HDFS without append enabled may result in data loss");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// lease recovery not needed for local file system case.
|
||||||
|
// currently, local file system doesn't implement append either.
|
||||||
|
if (!(fs instanceof DistributedFileSystem)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Recovering file " + p);
|
||||||
|
long startWaiting = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// Trying recovery
|
||||||
|
boolean recovered = false;
|
||||||
|
while (!recovered) {
|
||||||
|
try {
|
||||||
|
try {
|
||||||
|
if (fs instanceof DistributedFileSystem) {
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem)fs;
|
||||||
|
DistributedFileSystem.class.getMethod("recoverLease",
|
||||||
|
new Class[] {Path.class}).invoke(dfs, p);
|
||||||
|
} else {
|
||||||
|
throw new Exception("Not a DistributedFileSystem");
|
||||||
|
}
|
||||||
|
} catch (InvocationTargetException ite) {
|
||||||
|
// function was properly called, but threw it's own exception
|
||||||
|
throw (IOException) ite.getCause();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.debug("Failed fs.recoverLease invocation, " + e.toString() +
|
||||||
|
", trying fs.append instead");
|
||||||
|
FSDataOutputStream out = fs.append(p);
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
recovered = true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
e = RemoteExceptionHandler.checkIOException(e);
|
||||||
|
if (e instanceof AlreadyBeingCreatedException) {
|
||||||
|
// We expect that we'll get this message while the lease is still
|
||||||
|
// within its soft limit, but if we get it past that, it means
|
||||||
|
// that the RS is holding onto the file even though it lost its
|
||||||
|
// znode. We could potentially abort after some time here.
|
||||||
|
long waitedFor = System.currentTimeMillis() - startWaiting;
|
||||||
|
if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
|
||||||
|
LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p +
|
||||||
|
":" + e.getMessage());
|
||||||
|
}
|
||||||
|
} else if (e instanceof LeaseExpiredException &&
|
||||||
|
e.getMessage().contains("File does not exist")) {
|
||||||
|
// This exception comes out instead of FNFE, fix it
|
||||||
|
throw new FileNotFoundException(
|
||||||
|
"The given HLog wasn't found at " + p.toString());
|
||||||
|
} else {
|
||||||
|
throw new IOException("Failed to open " + p + " for append", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
new InterruptedIOException().initCause(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("Finished lease recover attempt for " + p);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generic non-hdfs implementation.
|
||||||
|
* May not work (doesn't work on nfs for example).
|
||||||
|
*/
|
||||||
|
public class FSNonHDFSUtils {
|
||||||
|
private static final Log LOG = LogFactory.getLog(FSNonHDFSUtils.class);
|
||||||
|
|
||||||
|
public static void recoverFileLease(final FileSystem fs, final Path p,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
LOG.info("Recovering file " + p.toString() +
|
||||||
|
" by changing permission to readonly");
|
||||||
|
FsPermission roPerm = new FsPermission((short) 0444);
|
||||||
|
fs.setPermission(p, roPerm);
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,18 +35,15 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.lang.reflect.InvocationTargetException;
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -57,15 +54,25 @@ import java.util.Map;
|
||||||
/**
|
/**
|
||||||
* Utility methods for interacting with the underlying file system.
|
* Utility methods for interacting with the underlying file system.
|
||||||
*/
|
*/
|
||||||
public class FSUtils {
|
public abstract class FSUtils {
|
||||||
private static final Log LOG = LogFactory.getLog(FSUtils.class);
|
private static final Log LOG = LogFactory.getLog(FSUtils.class);
|
||||||
|
|
||||||
/**
|
protected FSUtils() {
|
||||||
* Not instantiable
|
|
||||||
*/
|
|
||||||
private FSUtils() {
|
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static FSUtils getInstance(FileSystem fs, Configuration conf) {
|
||||||
|
String scheme = fs.getUri().getScheme();
|
||||||
|
if (scheme == null) {
|
||||||
|
LOG.warn("Could not find scheme for uri " +
|
||||||
|
fs.getUri() + ", default to hdfs");
|
||||||
|
scheme = "hdfs";
|
||||||
|
}
|
||||||
|
Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
|
||||||
|
scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
|
||||||
|
FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
|
||||||
|
return fsUtils;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete if exists.
|
* Delete if exists.
|
||||||
|
@ -768,79 +775,17 @@ public class FSUtils {
|
||||||
return scheme.equalsIgnoreCase("hdfs");
|
return scheme.equalsIgnoreCase("hdfs");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Recover file lease. Used when a file might be suspect to be had been left open by another process. <code>p</code>
|
* Recover file lease. Used when a file might be suspect
|
||||||
* @param fs
|
* to be had been left open by another process.
|
||||||
* @param p
|
* @param fs FileSystem handle
|
||||||
* @param append True if append supported
|
* @param p Path of file to recover lease
|
||||||
|
* @param conf Configuration handle
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
|
public abstract void recoverFileLease(final FileSystem fs, final Path p,
|
||||||
throws IOException{
|
Configuration conf) throws IOException;
|
||||||
if (!isAppendSupported(conf)) {
|
|
||||||
LOG.warn("Running on HDFS without append enabled may result in data loss");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// lease recovery not needed for local file system case.
|
|
||||||
// currently, local file system doesn't implement append either.
|
|
||||||
if (!(fs instanceof DistributedFileSystem)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
LOG.info("Recovering file " + p);
|
|
||||||
long startWaiting = System.currentTimeMillis();
|
|
||||||
|
|
||||||
// Trying recovery
|
|
||||||
boolean recovered = false;
|
|
||||||
while (!recovered) {
|
|
||||||
try {
|
|
||||||
try {
|
|
||||||
if (fs instanceof DistributedFileSystem) {
|
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem)fs;
|
|
||||||
DistributedFileSystem.class.getMethod("recoverLease",
|
|
||||||
new Class[] {Path.class}).invoke(dfs, p);
|
|
||||||
} else {
|
|
||||||
throw new Exception("Not a DistributedFileSystem");
|
|
||||||
}
|
|
||||||
} catch (InvocationTargetException ite) {
|
|
||||||
// function was properly called, but threw it's own exception
|
|
||||||
throw (IOException) ite.getCause();
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.debug("Failed fs.recoverLease invocation, " + e.toString() +
|
|
||||||
", trying fs.append instead");
|
|
||||||
FSDataOutputStream out = fs.append(p);
|
|
||||||
out.close();
|
|
||||||
}
|
|
||||||
recovered = true;
|
|
||||||
} catch (IOException e) {
|
|
||||||
e = RemoteExceptionHandler.checkIOException(e);
|
|
||||||
if (e instanceof AlreadyBeingCreatedException) {
|
|
||||||
// We expect that we'll get this message while the lease is still
|
|
||||||
// within its soft limit, but if we get it past that, it means
|
|
||||||
// that the RS is holding onto the file even though it lost its
|
|
||||||
// znode. We could potentially abort after some time here.
|
|
||||||
long waitedFor = System.currentTimeMillis() - startWaiting;
|
|
||||||
if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
|
|
||||||
LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p +
|
|
||||||
":" + e.getMessage());
|
|
||||||
}
|
|
||||||
} else if (e instanceof LeaseExpiredException &&
|
|
||||||
e.getMessage().contains("File does not exist")) {
|
|
||||||
// This exception comes out instead of FNFE, fix it
|
|
||||||
throw new FileNotFoundException(
|
|
||||||
"The given HLog wasn't found at " + p.toString());
|
|
||||||
} else {
|
|
||||||
throw new IOException("Failed to open " + p + " for append", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
new InterruptedIOException().initCause(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.info("Finished lease recover attempt for " + p);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param fs
|
* @param fs
|
||||||
* @param rootdir
|
* @param rootdir
|
||||||
|
|
|
@ -651,7 +651,8 @@ public class RegionSplitter {
|
||||||
fs.rename(tmpFile, splitFile);
|
fs.rename(tmpFile, splitFile);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("_balancedSplit file found. Replay log to restore state...");
|
LOG.debug("_balancedSplit file found. Replay log to restore state...");
|
||||||
FSUtils.recoverFileLease(fs, splitFile, table.getConfiguration());
|
FSUtils.getInstance(fs, table.getConfiguration())
|
||||||
|
.recoverFileLease(fs, splitFile, table.getConfiguration());
|
||||||
|
|
||||||
// parse split file and process remaining splits
|
// parse split file and process remaining splits
|
||||||
FSDataInputStream tmpIn = fs.open(splitFile);
|
FSDataInputStream tmpIn = fs.open(splitFile);
|
||||||
|
|
|
@ -408,7 +408,8 @@ public class TestHLog {
|
||||||
public Exception exception = null;
|
public Exception exception = null;
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
FSUtils.recoverFileLease(recoveredFs, walPath, rlConf);
|
FSUtils.getInstance(fs, rlConf)
|
||||||
|
.recoverFileLease(recoveredFs, walPath, rlConf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
exception = e;
|
exception = e;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue