HADOOP-13738. DiskChecker should perform some disk IO.

This commit is contained in:
Arpit Agarwal 2016-11-01 18:02:23 -07:00
parent bd7f5911c7
commit 1b6ecaf016
2 changed files with 309 additions and 28 deletions

View File

@ -19,14 +19,23 @@
package org.apache.hadoop.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class that provides utility functions for checking disk problem
@ -34,6 +43,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class DiskChecker {
public static final Logger LOG = LoggerFactory.getLogger(DiskChecker.class);
public static class DiskErrorException extends IOException {
public DiskErrorException(String msg) {
super(msg);
@ -49,7 +60,12 @@ public class DiskChecker {
super(msg);
}
}
// Provider that abstracts some FileOutputStream operations for
// testability.
private static AtomicReference<FileIoProvider> fileIoProvider =
new AtomicReference<>(new DefaultFileIoProvider());
/**
* Create the directory if it doesn't exist and check that dir is readable,
* writable and executable
@ -63,6 +79,7 @@ public class DiskChecker {
+ dir.toString());
}
checkAccessByFileMethods(dir);
doDiskIo(dir);
}
/**
@ -80,6 +97,7 @@ public class DiskChecker {
throws DiskErrorException, IOException {
mkdirsWithExistsAndPermissionCheck(localFS, dir, expected);
checkAccessByFileMethods(localFS.pathToFile(dir));
doDiskIo(localFS.pathToFile(dir));
}
/**
@ -173,4 +191,143 @@ public class DiskChecker {
if (created || !localFS.getFileStatus(dir).getPermission().equals(expected))
localFS.setPermission(dir, expected);
}
// State related to running disk IO checks.
private static final String DISK_IO_FILE_PREFIX =
"DiskChecker.OK_TO_DELETE_.";
@VisibleForTesting
static final int DISK_IO_MAX_ITERATIONS = 3;
/**
* Performs some disk IO by writing to a new file in the given directory
* and sync'ing file contents to disk.
*
* This increases the likelihood of catching catastrophic disk/controller
* failures sooner.
*
* @param dir directory to be checked.
* @throws DiskErrorException if we hit an error while trying to perform
* disk IO against the file.
*/
private static void doDiskIo(File dir) throws DiskErrorException {
try {
IOException ioe = null;
for (int i = 0; i < DISK_IO_MAX_ITERATIONS; ++i) {
final File file = getFileNameForDiskIoCheck(dir, i+1);
try {
diskIoCheckWithoutNativeIo(file);
return;
} catch (IOException e) {
// Let's retry a few times before we really give up and
// declare the disk as bad.
ioe = e;
}
}
throw ioe; // Just rethrow the last exception to signal failure.
} catch(IOException e) {
throw new DiskErrorException("Error checking directory " + dir, e);
}
}
/**
* Try to perform some disk IO by writing to the given file
* without using Native IO.
*
* @param file
* @throws IOException if there was a non-retriable error.
*/
private static void diskIoCheckWithoutNativeIo(File file)
throws IOException {
FileOutputStream fos = null;
try {
final FileIoProvider provider = fileIoProvider.get();
fos = provider.get(file);
provider.write(fos, new byte[1]);
fos.getFD().sync();
fos.close();
fos = null;
if (!file.delete() && file.exists()) {
throw new IOException("Failed to delete " + file);
}
file = null;
} finally {
IOUtils.cleanup(null, fos);
FileUtils.deleteQuietly(file);
}
}
/**
* Generate a path name for a test file under the given directory.
*
* @return file object.
*/
@VisibleForTesting
static File getFileNameForDiskIoCheck(File dir, int iterationCount) {
if (iterationCount < DISK_IO_MAX_ITERATIONS) {
// Use file names of the format prefix.001 by default.
return new File(dir,
DISK_IO_FILE_PREFIX + String.format("%03d", iterationCount));
} else {
// If the first few checks then fail, try using a randomly generated
// file name.
return new File(dir, DISK_IO_FILE_PREFIX + UUID.randomUUID());
}
}
/**
* An interface that abstracts operations on {@link FileOutputStream}
* objects for testability.
*/
interface FileIoProvider {
FileOutputStream get(File f) throws FileNotFoundException;
void write(FileOutputStream fos, byte[] data) throws IOException;
}
/**
* The default implementation of {@link FileIoProvider}.
*/
private static class DefaultFileIoProvider implements FileIoProvider {
/**
* See {@link FileOutputStream#FileOutputStream(File)}.
*/
@Override
public FileOutputStream get(File f) throws FileNotFoundException {
return new FileOutputStream(f);
}
/**
* See {@link FileOutputStream#write(byte[])}.
*/
@Override
public void write(FileOutputStream fos, byte[] data) throws IOException {
fos.write(data);
}
}
/**
* Replace the {@link FileIoProvider} for tests.
* This method MUST NOT be used outside of unit tests.
*
* @param newFosProvider
* @return the old FileIoProvider.
*/
@VisibleForTesting
static FileIoProvider replaceFileOutputStreamProvider(
FileIoProvider newFosProvider) {
return fileIoProvider.getAndSet(newFosProvider);
}
/**
* Retrieve the current {@link FileIoProvider}.
* This method MUST NOT be used outside of unit tests.
*
* @return the current FileIoProvider.
*/
@VisibleForTesting
static FileIoProvider getFileOutputStreamProvider() {
return fileIoProvider.get();
}
}

View File

@ -19,7 +19,11 @@ package org.apache.hadoop.util;
import java.io.*;
import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.util.DiskChecker.FileIoProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
@ -33,27 +37,46 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestDiskChecker {
final FsPermission defaultPerm = new FsPermission("755");
final FsPermission invalidPerm = new FsPermission("000");
public static final Logger LOG =
LoggerFactory.getLogger(TestDiskChecker.class);
@Test (timeout = 30000)
private final FsPermission defaultPerm = new FsPermission("755");
private final FsPermission invalidPerm = new FsPermission("000");
private FileIoProvider fileIoProvider = null;
@Before
public void setup() {
// Some tests replace the static field DiskChecker#fileIoProvider.
// Cache it so we can restore it after each test completes.
fileIoProvider = DiskChecker.getFileOutputStreamProvider();
}
@After
public void cleanup() {
DiskChecker.replaceFileOutputStreamProvider(fileIoProvider);
}
@Test(timeout = 30000)
public void testMkdirs_dirExists() throws Throwable {
_mkdirs(true, defaultPerm, defaultPerm);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testMkdirs_noDir() throws Throwable {
_mkdirs(false, defaultPerm, defaultPerm);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testMkdirs_dirExists_badUmask() throws Throwable {
_mkdirs(true, defaultPerm, invalidPerm);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testMkdirs_noDir_badUmask() throws Throwable {
_mkdirs(false, defaultPerm, invalidPerm);
}
@ -78,34 +101,33 @@ public class TestDiskChecker {
verify(fs).getFileStatus(dir);
verify(stat).getPermission();
}
}
catch (DiskErrorException e) {
} catch (DiskErrorException e) {
if (before != after)
assertTrue(e.getMessage().startsWith("Incorrect permission"));
}
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_normal() throws Throwable {
_checkDirs(true, new FsPermission("755"), true);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_notDir() throws Throwable {
_checkDirs(false, new FsPermission("000"), false);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_notReadable() throws Throwable {
_checkDirs(true, new FsPermission("000"), false);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_notWritable() throws Throwable {
_checkDirs(true, new FsPermission("444"), false);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_notListable() throws Throwable {
_checkDirs(true, new FsPermission("666"), false); // not listable
}
@ -134,13 +156,15 @@ public class TestDiskChecker {
throws Throwable {
File localDir = isDir ? createTempDir() : createTempFile();
Shell.execCommand(Shell.getSetPermissionCommand(String.format("%04o",
perm.toShort()), false, localDir.getAbsolutePath()));
perm.toShort()), false, localDir.getAbsolutePath()));
try {
DiskChecker.checkDir(FileSystem.getLocal(new Configuration()),
new Path(localDir.getAbsolutePath()), perm);
assertTrue("checkDir success", success);
new Path(localDir.getAbsolutePath()), perm);
assertTrue("checkDir success, expected failure", success);
} catch (DiskErrorException e) {
assertFalse("checkDir success", success);
if (success) {
throw e; // Unexpected exception!
}
}
localDir.delete();
}
@ -150,27 +174,27 @@ public class TestDiskChecker {
* permission for result of mapper.
*/
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_normal_local() throws Throwable {
checkDirs(true, "755", true);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_notDir_local() throws Throwable {
checkDirs(false, "000", false);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_notReadable_local() throws Throwable {
checkDirs(true, "000", false);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_notWritable_local() throws Throwable {
checkDirs(true, "444", false);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testCheckDir_notListable_local() throws Throwable {
checkDirs(true, "666", false);
}
@ -179,16 +203,116 @@ public class TestDiskChecker {
throws Throwable {
File localDir = isDir ? createTempDir() : createTempFile();
Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
localDir.getAbsolutePath()));
localDir.getAbsolutePath()));
try {
DiskChecker.checkDir(localDir);
assertTrue("checkDir success", success);
assertTrue("checkDir success, expected failure", success);
} catch (DiskErrorException e) {
e.printStackTrace();
assertFalse("checkDir success", success);
if (success) {
throw e; // Unexpected exception!
}
}
localDir.delete();
System.out.println("checkDir success: " + success);
}
/**
* Verify DiskChecker ignores at least 2 transient file creation errors.
*/
@Test(timeout = 30000)
public void testDiskIoIgnoresTransientCreateErrors() throws Throwable {
DiskChecker.replaceFileOutputStreamProvider(new TestFileIoProvider(
DiskChecker.DISK_IO_MAX_ITERATIONS - 1, 0));
checkDirs(true, "755", true);
}
/**
* Verify DiskChecker bails after 3 file creation errors.
*/
@Test(timeout = 30000)
public void testDiskIoDetectsCreateErrors() throws Throwable {
DiskChecker.replaceFileOutputStreamProvider(new TestFileIoProvider(
DiskChecker.DISK_IO_MAX_ITERATIONS, 0));
checkDirs(true, "755", false);
}
/**
* Verify DiskChecker ignores at least 2 transient file write errors.
*/
@Test(timeout = 30000)
public void testDiskIoIgnoresTransientWriteErrors() throws Throwable {
DiskChecker.replaceFileOutputStreamProvider(new TestFileIoProvider(
0, DiskChecker.DISK_IO_MAX_ITERATIONS - 1));
checkDirs(true, "755", true);
}
/**
* Verify DiskChecker bails after 3 file write errors.
*/
@Test(timeout = 30000)
public void testDiskIoDetectsWriteErrors() throws Throwable {
DiskChecker.replaceFileOutputStreamProvider(new TestFileIoProvider(
0, DiskChecker.DISK_IO_MAX_ITERATIONS));
checkDirs(true, "755", false);
}
/**
* Verify DiskChecker's test file naming scheme.
*/
@Test(timeout = 30000)
public void testDiskIoFileNaming() throws Throwable {
final File rootDir = new File("/");
assertTrue(".001".matches("\\.00\\d$"));
for (int i = 1; i < DiskChecker.DISK_IO_MAX_ITERATIONS; ++i) {
final File file = DiskChecker.getFileNameForDiskIoCheck(rootDir, i);
assertTrue(
"File name does not match expected pattern: " + file,
file.toString().matches("^.*\\.[0-9]+$"));
}
final File guidFile = DiskChecker.getFileNameForDiskIoCheck(
rootDir, DiskChecker.DISK_IO_MAX_ITERATIONS);
assertTrue(
"File name does not match expected pattern: " + guidFile,
guidFile.toString().matches("^.*\\.[A-Za-z0-9-]+$"));
}
/**
* A dummy {@link DiskChecker#FileIoProvider} that can throw a programmable
* number of times.
*/
private static class TestFileIoProvider implements FileIoProvider {
private final AtomicInteger numCreateCalls = new AtomicInteger(0);
private final AtomicInteger numWriteCalls = new AtomicInteger(0);
private final int numTimesToThrowOnCreate;
private final int numTimesToThrowOnWrite;
public TestFileIoProvider(
int numTimesToThrowOnCreate, int numTimesToThrowOnWrite) {
this.numTimesToThrowOnCreate = numTimesToThrowOnCreate;
this.numTimesToThrowOnWrite = numTimesToThrowOnWrite;
}
/**
* {@inheritDoc}
*/
@Override
public FileOutputStream get(File f) throws FileNotFoundException {
if (numCreateCalls.getAndIncrement() < numTimesToThrowOnCreate) {
throw new FileNotFoundException("Dummy exception for testing");
}
// Can't mock final class FileOutputStream.
return new FileOutputStream(f);
}
/**
* {@inheritDoc}
*/
@Override
public void write(FileOutputStream fos, byte[] data) throws IOException {
if (numWriteCalls.getAndIncrement() < numTimesToThrowOnWrite) {
throw new IOException("Dummy exception for testing");
}
fos.write(data);
}
}
}