HDFS-11274. Datanode should only check the failed volume upon IO errors. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2016-12-28 22:08:13 -08:00 committed by Arpit Agarwal
parent ec80de3ccc
commit eafaddca1a
23 changed files with 377 additions and 265 deletions

View File

@ -276,10 +276,9 @@ class BlockReceiver implements Closeable {
IOException cause = DatanodeUtil.getCauseIfDiskError(ioe); IOException cause = DatanodeUtil.getCauseIfDiskError(ioe);
DataNode.LOG.warn("IOException in BlockReceiver constructor" DataNode.LOG.warn("IOException in BlockReceiver constructor"
+ (cause == null ? "" : ". Cause is "), cause); + (cause == null ? "" : ". Cause is "), cause);
if (cause != null) {
if (cause != null) { // possible disk error
ioe = cause; ioe = cause;
datanode.checkDiskErrorAsync(); // Volume error check moved to FileIoProvider
} }
throw ioe; throw ioe;
@ -361,9 +360,8 @@ class BlockReceiver implements Closeable {
if (measuredFlushTime) { if (measuredFlushTime) {
datanode.metrics.addFlushNanos(flushTotalNanos); datanode.metrics.addFlushNanos(flushTotalNanos);
} }
// disk check
if(ioe != null) { if(ioe != null) {
datanode.checkDiskErrorAsync(); // Volume error check moved to FileIoProvider
throw ioe; throw ioe;
} }
} }
@ -786,7 +784,7 @@ class BlockReceiver implements Closeable {
manageWriterOsCache(offsetInBlock); manageWriterOsCache(offsetInBlock);
} }
} catch (IOException iex) { } catch (IOException iex) {
datanode.checkDiskErrorAsync(); // Volume error check moved to FileIoProvider
throw iex; throw iex;
} }
} }
@ -1395,7 +1393,7 @@ class BlockReceiver implements Closeable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e); LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) { if (running) {
datanode.checkDiskErrorAsync(); // Volume error check moved to FileIoProvider
LOG.info(myString, e); LOG.info(myString, e);
running = false; running = false;
if (!Thread.interrupted()) { // failure not caused by interruption if (!Thread.interrupted()) { // failure not caused by interruption

View File

@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class CountingFileIoEvents implements FileIoEvents { public class CountingFileIoEvents extends FileIoEvents {
private final Map<OPERATION, Counts> counts; private final Map<OPERATION, Counts> counts;
private static class Counts { private static class Counts {
@ -90,7 +90,6 @@ public class CountingFileIoEvents implements FileIoEvents {
public void onFailure( public void onFailure(
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) { @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
counts.get(op).failures.incrementAndGet(); counts.get(op).failures.incrementAndGet();
} }
@Override @Override

View File

@ -371,6 +371,7 @@ public class DataNode extends ReconfigurableBase
SaslDataTransferServer saslServer; SaslDataTransferServer saslServer;
private final boolean getHdfsBlockLocationsEnabled; private final boolean getHdfsBlockLocationsEnabled;
private ObjectName dataNodeInfoBeanName; private ObjectName dataNodeInfoBeanName;
// Test verification only
private volatile long lastDiskErrorCheck; private volatile long lastDiskErrorCheck;
private String supergroup; private String supergroup;
private boolean isPermissionEnabled; private boolean isPermissionEnabled;
@ -408,7 +409,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf); this.tracer = createTracer(conf);
this.tracerConfigurationManager = this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
this.fileIoProvider = new FileIoProvider(conf); this.fileIoProvider = new FileIoProvider(conf, this);
this.fileDescriptorPassingDisabledReason = null; this.fileDescriptorPassingDisabledReason = null;
this.maxNumberOfBlocksToLog = 0; this.maxNumberOfBlocksToLog = 0;
this.confVersion = null; this.confVersion = null;
@ -433,7 +434,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf); this.tracer = createTracer(conf);
this.tracerConfigurationManager = this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
this.fileIoProvider = new FileIoProvider(conf); this.fileIoProvider = new FileIoProvider(conf, this);
this.blockScanner = new BlockScanner(this); this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0; this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@ -783,7 +784,7 @@ public class DataNode extends ReconfigurableBase
/** /**
* Remove volumes from DataNode. * Remove volumes from DataNode.
* See {@link #removeVolumes(Set, boolean)} for details. * See {@link #removeVolumes(Collection, boolean)} for details.
* *
* @param locations the StorageLocations of the volumes to be removed. * @param locations the StorageLocations of the volumes to be removed.
* @throws IOException * @throws IOException
@ -810,7 +811,7 @@ public class DataNode extends ReconfigurableBase
* <ul>Reset configuration DATA_DIR and {@link #dataDirs} to represent * <ul>Reset configuration DATA_DIR and {@link #dataDirs} to represent
* active volumes.</ul> * active volumes.</ul>
* </li> * </li>
* @param absoluteVolumePaths the absolute path of volumes. * @param storageLocations the absolute path of volumes.
* @param clearFailure if true, clears the failure information related to the * @param clearFailure if true, clears the failure information related to the
* volumes. * volumes.
* @throws IOException * @throws IOException
@ -1258,7 +1259,7 @@ public class DataNode extends ReconfigurableBase
* If conf's CONFIG_PROPERTY_SIMULATED property is set * If conf's CONFIG_PROPERTY_SIMULATED property is set
* then a simulated storage based data node is created. * then a simulated storage based data node is created.
* *
* @param dataDirs - only for a non-simulated storage data node * @param dataDirectories - only for a non-simulated storage data node
* @throws IOException * @throws IOException
*/ */
void startDataNode(List<StorageLocation> dataDirectories, void startDataNode(List<StorageLocation> dataDirectories,
@ -2020,13 +2021,36 @@ public class DataNode extends ReconfigurableBase
tracer.close(); tracer.close();
} }
/** /**
* Check if there is a disk failure asynchronously and if so, handle the error * Check if there is a disk failure asynchronously
* and if so, handle the error.
*/ */
@VisibleForTesting
public void checkDiskErrorAsync() { public void checkDiskErrorAsync() {
volumeChecker.checkAllVolumesAsync( volumeChecker.checkAllVolumesAsync(
data, new DatasetVolumeChecker.Callback() { data, new DatasetVolumeChecker.Callback() {
@Override
public void call(Set<FsVolumeSpi> healthyVolumes,
Set<FsVolumeSpi> failedVolumes) {
if (failedVolumes.size() > 0) {
LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
failedVolumes.size(), failedVolumes);
} else {
LOG.debug("checkDiskErrorAsync: no volume failures detected");
}
lastDiskErrorCheck = Time.monotonicNow();
handleVolumeFailures(failedVolumes);
}
});
}
/**
* Check if there is a disk failure asynchronously
* and if so, handle the error.
*/
public void checkDiskErrorAsync(FsVolumeSpi volume) {
volumeChecker.checkVolume(
volume, new DatasetVolumeChecker.Callback() {
@Override @Override
public void call(Set<FsVolumeSpi> healthyVolumes, public void call(Set<FsVolumeSpi> healthyVolumes,
Set<FsVolumeSpi> failedVolumes) { Set<FsVolumeSpi> failedVolumes) {
@ -2037,14 +2061,15 @@ public class DataNode extends ReconfigurableBase
LOG.debug("checkDiskErrorAsync: no volume failures detected"); LOG.debug("checkDiskErrorAsync: no volume failures detected");
} }
lastDiskErrorCheck = Time.monotonicNow(); lastDiskErrorCheck = Time.monotonicNow();
DataNode.this.handleVolumeFailures(failedVolumes); handleVolumeFailures(failedVolumes);
} }
}); });
} }
private void handleDiskError(String errMsgr) { private void handleDiskError(String failedVolumes) {
final boolean hasEnoughResources = data.hasEnoughResource(); final boolean hasEnoughResources = data.hasEnoughResource();
LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources); LOG.warn("DataNode.handleDiskError on : [" + failedVolumes +
"] Keep Running: " + hasEnoughResources);
// If we have enough active valid volumes then we do not want to // If we have enough active valid volumes then we do not want to
// shutdown the DN completely. // shutdown the DN completely.
@ -2054,7 +2079,7 @@ public class DataNode extends ReconfigurableBase
//inform NameNodes //inform NameNodes
for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) { for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
bpos.trySendErrorReport(dpError, errMsgr); bpos.trySendErrorReport(dpError, failedVolumes);
} }
if(hasEnoughResources) { if(hasEnoughResources) {
@ -2062,7 +2087,8 @@ public class DataNode extends ReconfigurableBase
return; // do not shutdown return; // do not shutdown
} }
LOG.warn("DataNode is shutting down: " + errMsgr); LOG.warn("DataNode is shutting down due to failed volumes: ["
+ failedVolumes + "]");
shouldRun = false; shouldRun = false;
} }
@ -2412,8 +2438,11 @@ public class DataNode extends ReconfigurableBase
} }
LOG.warn(bpReg + ":Failed to transfer " + b + " to " + LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
targets[0] + " got ", ie); targets[0] + " got ", ie);
// check if there are any disk problem // disk check moved to FileIoProvider
checkDiskErrorAsync(); IOException cause = DatanodeUtil.getCauseIfDiskError(ie);
if (cause != null) { // possible disk error
LOG.warn("IOException in DataTransfer#run(). Cause is ", cause);
}
} finally { } finally {
xmitsInProgress.getAndDecrement(); xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender); IOUtils.closeStream(blockSender);
@ -3167,28 +3196,36 @@ public class DataNode extends ReconfigurableBase
} }
private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) { private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
if (unhealthyVolumes.isEmpty()) {
LOG.debug("handleVolumeFailures done with empty " +
"unhealthyVolumes");
return;
}
data.handleVolumeFailures(unhealthyVolumes); data.handleVolumeFailures(unhealthyVolumes);
final Set<File> unhealthyDirs = new HashSet<>(unhealthyVolumes.size()); final Set<File> unhealthyDirs = new HashSet<>(unhealthyVolumes.size());
if (!unhealthyVolumes.isEmpty()) { StringBuilder sb = new StringBuilder("DataNode failed volumes:");
StringBuilder sb = new StringBuilder("DataNode failed volumes:"); for (FsVolumeSpi vol : unhealthyVolumes) {
for (FsVolumeSpi vol : unhealthyVolumes) { unhealthyDirs.add(new File(vol.getBasePath()).getAbsoluteFile());
unhealthyDirs.add(new File(vol.getBasePath()).getAbsoluteFile()); sb.append(vol).append(";");
sb.append(vol).append(";");
}
try {
// Remove all unhealthy volumes from DataNode.
removeVolumes(unhealthyDirs, false);
} catch (IOException e) {
LOG.warn("Error occurred when removing unhealthy storage dirs: "
+ e.getMessage(), e);
}
LOG.info(sb.toString());
handleDiskError(sb.toString());
} }
try {
// Remove all unhealthy volumes from DataNode.
removeVolumes(unhealthyDirs, false);
} catch (IOException e) {
LOG.warn("Error occurred when removing unhealthy storage dirs: "
+ e.getMessage(), e);
}
if (LOG.isDebugEnabled()) {
LOG.debug(sb.toString());
}
// send blockreport regarding volume failure
handleDiskError(sb.toString());
} }
@VisibleForTesting
public long getLastDiskErrorCheck() { public long getLastDiskErrorCheck() {
return lastDiskErrorCheck; return lastDiskErrorCheck;
} }

View File

@ -31,7 +31,7 @@ import javax.annotation.Nullable;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public final class DefaultFileIoEvents implements FileIoEvents { public final class DefaultFileIoEvents extends FileIoEvents {
@Override @Override
public long beforeMetadataOp( public long beforeMetadataOp(
@Nullable FsVolumeSpi volume, OPERATION op) { @Nullable FsVolumeSpi volume, OPERATION op) {

View File

@ -855,7 +855,7 @@ public class DirectoryScanner implements Runnable {
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe); LOG.warn("Exception occured while compiling report: ", ioe);
// Initiate a check on disk failure. // Initiate a check on disk failure.
datanode.checkDiskErrorAsync(); datanode.checkDiskErrorAsync(volume);
// Ignore this directory and proceed. // Ignore this directory and proceed.
return report; return report;
} }

View File

@ -32,7 +32,7 @@ import javax.annotation.Nullable;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public interface FileIoEvents { public abstract class FileIoEvents {
/** /**
* Invoked before a filesystem metadata operation. * Invoked before a filesystem metadata operation.
@ -42,7 +42,7 @@ public interface FileIoEvents {
* @return timestamp at which the operation was started. 0 if * @return timestamp at which the operation was started. 0 if
* unavailable. * unavailable.
*/ */
long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op); abstract long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
/** /**
* Invoked after a filesystem metadata operation has completed. * Invoked after a filesystem metadata operation has completed.
@ -52,7 +52,8 @@ public interface FileIoEvents {
* @param begin timestamp at which the operation was started. 0 * @param begin timestamp at which the operation was started. 0
* if unavailable. * if unavailable.
*/ */
void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin); abstract void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op,
long begin);
/** /**
* Invoked before a read/write/flush/channel transfer operation. * Invoked before a read/write/flush/channel transfer operation.
@ -63,7 +64,8 @@ public interface FileIoEvents {
* @return timestamp at which the operation was started. 0 if * @return timestamp at which the operation was started. 0 if
* unavailable. * unavailable.
*/ */
long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len); abstract long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
long len);
/** /**
@ -76,22 +78,38 @@ public interface FileIoEvents {
* @return timestamp at which the operation was started. 0 if * @return timestamp at which the operation was started. 0 if
* unavailable. * unavailable.
*/ */
void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op, abstract void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
long begin, long len); long begin, long len);
/** /**
* Invoked if an operation fails with an exception. * Invoked if an operation fails with an exception.
* @param volume target volume for the operation. Null if unavailable. * @param volume target volume for the operation. Null if unavailable.
* @param op type of operation. * @param op type of operation.
* @param e Exception encountered during the operation. * @param e Exception encountered during the operation.
* @param begin time at which the operation was started. * @param begin time at which the operation was started.
*/ */
void onFailure( abstract void onFailure(
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin); @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
/**
* Invoked by FileIoProvider if an operation fails with an exception.
* @param datanode datanode that runs volume check upon volume io failure
* @param volume target volume for the operation. Null if unavailable.
* @param op type of operation.
* @param e Exception encountered during the operation.
* @param begin time at which the operation was started.
*/
void onFailure(DataNode datanode,
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
onFailure(volume, op, e, begin);
if (datanode != null && volume != null) {
datanode.checkDiskErrorAsync(volume);
}
}
/** /**
* Return statistics as a JSON string. * Return statistics as a JSON string.
* @return * @return
*/ */
@Nullable String getStatistics(); @Nullable abstract String getStatistics();
} }

View File

@ -79,12 +79,16 @@ public class FileIoProvider {
FileIoProvider.class); FileIoProvider.class);
private final FileIoEvents eventHooks; private final FileIoEvents eventHooks;
private final DataNode datanode;
/** /**
* @param conf Configuration object. May be null. When null, * @param conf Configuration object. May be null. When null,
* the event handlers are no-ops. * the event handlers are no-ops.
* @param datanode datanode that owns this FileIoProvider. Used for
* IO error based volume checker callback
*/ */
public FileIoProvider(@Nullable Configuration conf) { public FileIoProvider(@Nullable Configuration conf,
final DataNode datanode) {
if (conf != null) { if (conf != null) {
final Class<? extends FileIoEvents> clazz = conf.getClass( final Class<? extends FileIoEvents> clazz = conf.getClass(
DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY, DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
@ -94,6 +98,7 @@ public class FileIoProvider {
} else { } else {
eventHooks = new DefaultFileIoEvents(); eventHooks = new DefaultFileIoEvents();
} }
this.datanode = datanode;
} }
/** /**
@ -139,7 +144,7 @@ public class FileIoProvider {
f.flush(); f.flush();
eventHooks.afterFileIo(volume, FLUSH, begin, 0); eventHooks.afterFileIo(volume, FLUSH, begin, 0);
} catch (Exception e) { } catch (Exception e) {
eventHooks.onFailure(volume, FLUSH, e, begin); eventHooks.onFailure(datanode, volume, FLUSH, e, begin);
throw e; throw e;
} }
} }
@ -157,7 +162,7 @@ public class FileIoProvider {
fos.getChannel().force(true); fos.getChannel().force(true);
eventHooks.afterFileIo(volume, SYNC, begin, 0); eventHooks.afterFileIo(volume, SYNC, begin, 0);
} catch (Exception e) { } catch (Exception e) {
eventHooks.onFailure(volume, SYNC, e, begin); eventHooks.onFailure(datanode, volume, SYNC, e, begin);
throw e; throw e;
} }
} }
@ -176,7 +181,7 @@ public class FileIoProvider {
NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags); NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
eventHooks.afterFileIo(volume, SYNC, begin, 0); eventHooks.afterFileIo(volume, SYNC, begin, 0);
} catch (Exception e) { } catch (Exception e) {
eventHooks.onFailure(volume, SYNC, e, begin); eventHooks.onFailure(datanode, volume, SYNC, e, begin);
throw e; throw e;
} }
} }
@ -196,7 +201,7 @@ public class FileIoProvider {
identifier, outFd, offset, length, flags); identifier, outFd, offset, length, flags);
eventHooks.afterMetadataOp(volume, FADVISE, begin); eventHooks.afterMetadataOp(volume, FADVISE, begin);
} catch (Exception e) { } catch (Exception e) {
eventHooks.onFailure(volume, FADVISE, e, begin); eventHooks.onFailure(datanode, volume, FADVISE, e, begin);
throw e; throw e;
} }
} }
@ -214,7 +219,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, DELETE, begin); eventHooks.afterMetadataOp(volume, DELETE, begin);
return deleted; return deleted;
} catch (Exception e) { } catch (Exception e) {
eventHooks.onFailure(volume, DELETE, e, begin); eventHooks.onFailure(datanode, volume, DELETE, e, begin);
throw e; throw e;
} }
} }
@ -236,7 +241,7 @@ public class FileIoProvider {
} }
return deleted; return deleted;
} catch (Exception e) { } catch (Exception e) {
eventHooks.onFailure(volume, DELETE, e, begin); eventHooks.onFailure(datanode, volume, DELETE, e, begin);
throw e; throw e;
} }
} }
@ -264,7 +269,7 @@ public class FileIoProvider {
waitTime, transferTime); waitTime, transferTime);
eventHooks.afterFileIo(volume, TRANSFER, begin, count); eventHooks.afterFileIo(volume, TRANSFER, begin, count);
} catch (Exception e) { } catch (Exception e) {
eventHooks.onFailure(volume, TRANSFER, e, begin); eventHooks.onFailure(datanode, volume, TRANSFER, e, begin);
throw e; throw e;
} }
} }
@ -285,7 +290,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, OPEN, begin); eventHooks.afterMetadataOp(volume, OPEN, begin);
return created; return created;
} catch (Exception e) { } catch (Exception e) {
eventHooks.onFailure(volume, OPEN, e, begin); eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e; throw e;
} }
} }
@ -312,7 +317,7 @@ public class FileIoProvider {
return fis; return fis;
} catch(Exception e) { } catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(fis); org.apache.commons.io.IOUtils.closeQuietly(fis);
eventHooks.onFailure(volume, OPEN, e, begin); eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e; throw e;
} }
} }
@ -328,7 +333,7 @@ public class FileIoProvider {
* @param f File object. * @param f File object.
* @param append if true, then bytes will be written to the end of the * @param append if true, then bytes will be written to the end of the
* file rather than the beginning. * file rather than the beginning.
* @param FileOutputStream to the given file object. * @return FileOutputStream to the given file object.
* @throws FileNotFoundException * @throws FileNotFoundException
*/ */
public FileOutputStream getFileOutputStream( public FileOutputStream getFileOutputStream(
@ -342,7 +347,7 @@ public class FileIoProvider {
return fos; return fos;
} catch(Exception e) { } catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(fos); org.apache.commons.io.IOUtils.closeQuietly(fos);
eventHooks.onFailure(volume, OPEN, e, begin); eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e; throw e;
} }
} }
@ -372,7 +377,7 @@ public class FileIoProvider {
* before delegating to the wrapped stream. * before delegating to the wrapped stream.
* *
* @param volume target volume. null if unavailable. * @param volume target volume. null if unavailable.
* @param f File object. * @param fd File descriptor object.
* @return FileOutputStream to the given file object. * @return FileOutputStream to the given file object.
* @throws FileNotFoundException * @throws FileNotFoundException
*/ */
@ -407,7 +412,7 @@ public class FileIoProvider {
return fis; return fis;
} catch(Exception e) { } catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(fis); org.apache.commons.io.IOUtils.closeQuietly(fis);
eventHooks.onFailure(volume, OPEN, e, begin); eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e; throw e;
} }
} }
@ -438,7 +443,7 @@ public class FileIoProvider {
return fis; return fis;
} catch(Exception e) { } catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(fis); org.apache.commons.io.IOUtils.closeQuietly(fis);
eventHooks.onFailure(volume, OPEN, e, begin); eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e; throw e;
} }
} }
@ -468,7 +473,7 @@ public class FileIoProvider {
return raf; return raf;
} catch(Exception e) { } catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(raf); org.apache.commons.io.IOUtils.closeQuietly(raf);
eventHooks.onFailure(volume, OPEN, e, begin); eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e; throw e;
} }
} }
@ -487,7 +492,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, DELETE, begin); eventHooks.afterMetadataOp(volume, DELETE, begin);
return deleted; return deleted;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, DELETE, e, begin); eventHooks.onFailure(datanode, volume, DELETE, e, begin);
throw e; throw e;
} }
} }
@ -508,7 +513,7 @@ public class FileIoProvider {
FileUtil.replaceFile(src, target); FileUtil.replaceFile(src, target);
eventHooks.afterMetadataOp(volume, MOVE, begin); eventHooks.afterMetadataOp(volume, MOVE, begin);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, MOVE, e, begin); eventHooks.onFailure(datanode, volume, MOVE, e, begin);
throw e; throw e;
} }
} }
@ -530,7 +535,7 @@ public class FileIoProvider {
Storage.rename(src, target); Storage.rename(src, target);
eventHooks.afterMetadataOp(volume, MOVE, begin); eventHooks.afterMetadataOp(volume, MOVE, begin);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, MOVE, e, begin); eventHooks.onFailure(datanode, volume, MOVE, e, begin);
throw e; throw e;
} }
} }
@ -552,7 +557,7 @@ public class FileIoProvider {
FileUtils.moveFile(src, target); FileUtils.moveFile(src, target);
eventHooks.afterMetadataOp(volume, MOVE, begin); eventHooks.afterMetadataOp(volume, MOVE, begin);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, MOVE, e, begin); eventHooks.onFailure(datanode, volume, MOVE, e, begin);
throw e; throw e;
} }
} }
@ -576,7 +581,7 @@ public class FileIoProvider {
Files.move(src, target, options); Files.move(src, target, options);
eventHooks.afterMetadataOp(volume, MOVE, begin); eventHooks.afterMetadataOp(volume, MOVE, begin);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, MOVE, e, begin); eventHooks.onFailure(datanode, volume, MOVE, e, begin);
throw e; throw e;
} }
} }
@ -625,7 +630,7 @@ public class FileIoProvider {
Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate); Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length); eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, NATIVE_COPY, e, begin); eventHooks.onFailure(datanode, volume, NATIVE_COPY, e, begin);
throw e; throw e;
} }
} }
@ -650,7 +655,7 @@ public class FileIoProvider {
isDirectory = !created && dir.isDirectory(); isDirectory = !created && dir.isDirectory();
eventHooks.afterMetadataOp(volume, MKDIRS, begin); eventHooks.afterMetadataOp(volume, MKDIRS, begin);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, MKDIRS, e, begin); eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
throw e; throw e;
} }
@ -676,7 +681,7 @@ public class FileIoProvider {
succeeded = dir.isDirectory() || dir.mkdirs(); succeeded = dir.isDirectory() || dir.mkdirs();
eventHooks.afterMetadataOp(volume, MKDIRS, begin); eventHooks.afterMetadataOp(volume, MKDIRS, begin);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, MKDIRS, e, begin); eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
throw e; throw e;
} }
@ -702,7 +707,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, LIST, begin); eventHooks.afterMetadataOp(volume, LIST, begin);
return children; return children;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, LIST, e, begin); eventHooks.onFailure(datanode, volume, LIST, e, begin);
throw e; throw e;
} }
} }
@ -712,7 +717,7 @@ public class FileIoProvider {
* {@link FileUtil#listFiles(File)}. * {@link FileUtil#listFiles(File)}.
* *
* @param volume target volume. null if unavailable. * @param volume target volume. null if unavailable.
* @param Driectory to be listed. * @param dir directory to be listed.
* @return array of strings representing the directory entries. * @return array of strings representing the directory entries.
* @throws IOException * @throws IOException
*/ */
@ -724,7 +729,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, LIST, begin); eventHooks.afterMetadataOp(volume, LIST, begin);
return children; return children;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, LIST, e, begin); eventHooks.onFailure(datanode, volume, LIST, e, begin);
throw e; throw e;
} }
} }
@ -747,7 +752,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, LIST, begin); eventHooks.afterMetadataOp(volume, LIST, begin);
return children; return children;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, LIST, e, begin); eventHooks.onFailure(datanode, volume, LIST, e, begin);
throw e; throw e;
} }
} }
@ -769,7 +774,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, LIST, begin); eventHooks.afterMetadataOp(volume, LIST, begin);
return count; return count;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, LIST, e, begin); eventHooks.onFailure(datanode, volume, LIST, e, begin);
throw e; throw e;
} }
} }
@ -788,7 +793,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, EXISTS, begin); eventHooks.afterMetadataOp(volume, EXISTS, begin);
return exists; return exists;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, EXISTS, e, begin); eventHooks.onFailure(datanode, volume, EXISTS, e, begin);
throw e; throw e;
} }
} }
@ -829,7 +834,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, 1); eventHooks.afterFileIo(volume, READ, begin, 1);
return b; return b;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, READ, e, begin); eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e; throw e;
} }
} }
@ -845,7 +850,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, numBytesRead); eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
return numBytesRead; return numBytesRead;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, READ, e, begin); eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e; throw e;
} }
} }
@ -861,7 +866,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, numBytesRead); eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
return numBytesRead; return numBytesRead;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, READ, e, begin); eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e; throw e;
} }
} }
@ -903,7 +908,7 @@ public class FileIoProvider {
super.write(b); super.write(b);
eventHooks.afterFileIo(volume, WRITE, begin, 1); eventHooks.afterFileIo(volume, WRITE, begin, 1);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, WRITE, e, begin); eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e; throw e;
} }
} }
@ -918,7 +923,7 @@ public class FileIoProvider {
super.write(b); super.write(b);
eventHooks.afterFileIo(volume, WRITE, begin, b.length); eventHooks.afterFileIo(volume, WRITE, begin, b.length);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, WRITE, e, begin); eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e; throw e;
} }
} }
@ -933,7 +938,7 @@ public class FileIoProvider {
super.write(b, off, len); super.write(b, off, len);
eventHooks.afterFileIo(volume, WRITE, begin, len); eventHooks.afterFileIo(volume, WRITE, begin, len);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, WRITE, e, begin); eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e; throw e;
} }
} }
@ -961,7 +966,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, 1); eventHooks.afterFileIo(volume, READ, begin, 1);
return b; return b;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, READ, e, begin); eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e; throw e;
} }
} }
@ -974,7 +979,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, numBytesRead); eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
return numBytesRead; return numBytesRead;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, READ, e, begin); eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e; throw e;
} }
} }
@ -987,7 +992,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, numBytesRead); eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
return numBytesRead; return numBytesRead;
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, READ, e, begin); eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e; throw e;
} }
} }
@ -999,7 +1004,7 @@ public class FileIoProvider {
super.write(b); super.write(b);
eventHooks.afterFileIo(volume, WRITE, begin, 1); eventHooks.afterFileIo(volume, WRITE, begin, 1);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, WRITE, e, begin); eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e; throw e;
} }
} }
@ -1011,7 +1016,7 @@ public class FileIoProvider {
super.write(b); super.write(b);
eventHooks.afterFileIo(volume, WRITE, begin, b.length); eventHooks.afterFileIo(volume, WRITE, begin, b.length);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, WRITE, e, begin); eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e; throw e;
} }
} }
@ -1023,7 +1028,7 @@ public class FileIoProvider {
super.write(b, off, len); super.write(b, off, len);
eventHooks.afterFileIo(volume, WRITE, begin, len); eventHooks.afterFileIo(volume, WRITE, begin, len);
} catch(Exception e) { } catch(Exception e) {
eventHooks.onFailure(volume, WRITE, e, begin); eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e; throw e;
} }
} }

View File

@ -30,7 +30,7 @@ import javax.annotation.Nullable;
* related operations on datanode volumes. * related operations on datanode volumes.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class ProfilingFileIoEvents implements FileIoEvents { class ProfilingFileIoEvents extends FileIoEvents {
@Override @Override
public long beforeMetadataOp(@Nullable FsVolumeSpi volume, public long beforeMetadataOp(@Nullable FsVolumeSpi volume,

View File

@ -64,7 +64,7 @@ abstract public class ReplicaInfo extends Block
/** This is used by some tests and FsDatasetUtil#computeChecksum. */ /** This is used by some tests and FsDatasetUtil#computeChecksum. */
private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER = private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
new FileIoProvider(null); new FileIoProvider(null, null);
/** /**
* Constructor * Constructor

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.checker; package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -43,10 +44,10 @@ public interface AsyncChecker<K, V> {
* @param context the interpretation of the context depends on the * @param context the interpretation of the context depends on the
* target. * target.
* *
* @return returns a {@link ListenableFuture} that can be used to * @return returns a {@link Optional of ListenableFuture} that can be used to
* retrieve the result of the asynchronous check. * retrieve the result of the asynchronous check.
*/ */
ListenableFuture<V> schedule(Checkable<K, V> target, K context); Optional<ListenableFuture<V>> schedule(Checkable<K, V> target, K context);
/** /**
* Cancel all executing checks and wait for them to complete. * Cancel all executing checks and wait for them to complete.

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode.checker; package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
@ -191,18 +192,26 @@ public class DatasetVolumeChecker {
for (int i = 0; i < references.size(); ++i) { for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i); final FsVolumeReference reference = references.getReference(i);
allVolumes.add(reference.getVolume()); Optional<ListenableFuture<VolumeCheckResult>> olf =
ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
LOG.info("Scheduled health check for volume {}", reference.getVolume()); LOG.info("Scheduled health check for volume {}", reference.getVolume());
Futures.addCallback(future, new ResultHandler( if (olf.isPresent()) {
reference, healthyVolumes, failedVolumes, numVolumes, new Callback() { allVolumes.add(reference.getVolume());
@Override Futures.addCallback(olf.get(),
public void call(Set<FsVolumeSpi> ignored1, new ResultHandler(reference, healthyVolumes, failedVolumes,
Set<FsVolumeSpi> ignored2) { numVolumes, new Callback() {
@Override
public void call(Set<FsVolumeSpi> ignored1,
Set<FsVolumeSpi> ignored2) {
latch.countDown();
}
}));
} else {
IOUtils.cleanup(null, reference);
if (numVolumes.decrementAndGet() == 0) {
latch.countDown(); latch.countDown();
} }
})); }
} }
// Wait until our timeout elapses, after which we give up on // Wait until our timeout elapses, after which we give up on
@ -263,18 +272,26 @@ public class DatasetVolumeChecker {
final Set<FsVolumeSpi> healthyVolumes = new HashSet<>(); final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
final Set<FsVolumeSpi> failedVolumes = new HashSet<>(); final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
final AtomicLong numVolumes = new AtomicLong(references.size()); final AtomicLong numVolumes = new AtomicLong(references.size());
boolean added = false;
LOG.info("Checking {} volumes", references.size()); LOG.info("Checking {} volumes", references.size());
for (int i = 0; i < references.size(); ++i) { for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i); final FsVolumeReference reference = references.getReference(i);
// The context parameter is currently ignored. // The context parameter is currently ignored.
ListenableFuture<VolumeCheckResult> future = Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
Futures.addCallback(future, new ResultHandler( if (olf.isPresent()) {
reference, healthyVolumes, failedVolumes, numVolumes, callback)); added = true;
Futures.addCallback(olf.get(),
new ResultHandler(reference, healthyVolumes, failedVolumes,
numVolumes, callback));
} else {
IOUtils.cleanup(null, reference);
numVolumes.decrementAndGet();
}
} }
numAsyncDatasetChecks.incrementAndGet(); numAsyncDatasetChecks.incrementAndGet();
return true; return added;
} }
/** /**
@ -291,7 +308,7 @@ public class DatasetVolumeChecker {
} }
/** /**
* Check a single volume, returning a {@link ListenableFuture} * Check a single volume asynchronously, returning a {@link ListenableFuture}
* that can be used to retrieve the final result. * that can be used to retrieve the final result.
* *
* If the volume cannot be referenced then it is already closed and * If the volume cannot be referenced then it is already closed and
@ -305,21 +322,32 @@ public class DatasetVolumeChecker {
public boolean checkVolume( public boolean checkVolume(
final FsVolumeSpi volume, final FsVolumeSpi volume,
Callback callback) { Callback callback) {
if (volume == null) {
LOG.debug("Cannot schedule check on null volume");
return false;
}
FsVolumeReference volumeReference; FsVolumeReference volumeReference;
try { try {
volumeReference = volume.obtainReference(); volumeReference = volume.obtainReference();
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
// The volume has already been closed. // The volume has already been closed.
callback.call(new HashSet<FsVolumeSpi>(), new HashSet<FsVolumeSpi>());
return false; return false;
} }
ListenableFuture<VolumeCheckResult> future =
Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(volume, IGNORED_CONTEXT); delegateChecker.schedule(volume, IGNORED_CONTEXT);
numVolumeChecks.incrementAndGet(); if (olf.isPresent()) {
Futures.addCallback(future, new ResultHandler( numVolumeChecks.incrementAndGet();
volumeReference, new HashSet<FsVolumeSpi>(), new HashSet<FsVolumeSpi>(), Futures.addCallback(olf.get(),
new AtomicLong(1), callback)); new ResultHandler(volumeReference, new HashSet<FsVolumeSpi>(),
return true; new HashSet<FsVolumeSpi>(),
new AtomicLong(1), callback));
return true;
} else {
IOUtils.cleanup(null, volumeReference);
}
return false;
} }
/** /**
@ -343,8 +371,8 @@ public class DatasetVolumeChecker {
* successful, add the volume here. * successful, add the volume here.
* @param failedVolumes set of failed volumes. If the disk check fails, * @param failedVolumes set of failed volumes. If the disk check fails,
* add the volume here. * add the volume here.
* @param semaphore semaphore used to trigger callback invocation. * @param volumeCounter volumeCounter used to trigger callback invocation.
* @param callback invoked when the semaphore can be successfully acquired. * @param callback invoked when the volumeCounter reaches 0.
*/ */
ResultHandler(FsVolumeReference reference, ResultHandler(FsVolumeReference reference,
Set<FsVolumeSpi> healthyVolumes, Set<FsVolumeSpi> healthyVolumes,

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import com.google.common.base.Optional;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -158,8 +159,11 @@ public class StorageLocationChecker {
// Start parallel disk check operations on all StorageLocations. // Start parallel disk check operations on all StorageLocations.
for (StorageLocation location : dataDirs) { for (StorageLocation location : dataDirs) {
goodLocations.put(location, true); goodLocations.put(location, true);
futures.put(location, Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(location, context)); delegateChecker.schedule(location, context);
if (olf.isPresent()) {
futures.put(location, olf.get());
}
} }
if (maxVolumeFailuresTolerated >= dataDirs.size()) { if (maxVolumeFailuresTolerated >= dataDirs.size()) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.checker; package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -101,13 +102,11 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
* will receive the same Future. * will receive the same Future.
*/ */
@Override @Override
public synchronized ListenableFuture<V> schedule( public Optional<ListenableFuture<V>> schedule(
final Checkable<K, V> target, final Checkable<K, V> target, final K context) {
final K context) { LOG.info("Scheduling a check for {}", target);
LOG.debug("Scheduling a check of {}", target);
if (checksInProgress.containsKey(target)) { if (checksInProgress.containsKey(target)) {
return checksInProgress.get(target); return Optional.absent();
} }
if (completedChecks.containsKey(target)) { if (completedChecks.containsKey(target)) {
@ -115,11 +114,9 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
final long msSinceLastCheck = timer.monotonicNow() - result.completedAt; final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
if (msSinceLastCheck < minMsBetweenChecks) { if (msSinceLastCheck < minMsBetweenChecks) {
LOG.debug("Skipped checking {}. Time since last check {}ms " + LOG.debug("Skipped checking {}. Time since last check {}ms " +
"is less than the min gap {}ms.", "is less than the min gap {}ms.",
target, msSinceLastCheck, minMsBetweenChecks); target, msSinceLastCheck, minMsBetweenChecks);
return result.result != null ? return Optional.absent();
Futures.immediateFuture(result.result) :
Futures.<V>immediateFailedFuture(result.exception);
} }
} }
@ -132,7 +129,7 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
}); });
checksInProgress.put(target, lf); checksInProgress.put(target, lf);
addResultCachingCallback(target, lf); addResultCachingCallback(target, lf);
return lf; return Optional.of(lf);
} }
/** /**

View File

@ -277,7 +277,10 @@ class BlockPoolSlice {
fileIoProvider.getFileOutputStream(volume, outFile), "UTF-8")) { fileIoProvider.getFileOutputStream(volume, outFile), "UTF-8")) {
// mtime is written last, so that truncated writes won't be valid. // mtime is written last, so that truncated writes won't be valid.
out.write(Long.toString(used) + " " + Long.toString(timer.now())); out.write(Long.toString(used) + " " + Long.toString(timer.now()));
fileIoProvider.flush(volume, out); // This is only called as part of the volume shutdown.
// We explicitly avoid calling flush with fileIoProvider which triggers
// volume check upon io exception to avoid cyclic volume checks.
out.flush();
} }
} catch (IOException ioe) { } catch (IOException ioe) {
// If write failed, the volume might be bad. Since the cache file is // If write failed, the volume might be bad. Since the cache file is

View File

@ -1974,17 +1974,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/ */
File validateBlockFile(String bpid, long blockId) { File validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too? //Should we check for metadata file too?
final File f; File f = null;
ReplicaInfo info;
try(AutoCloseableLock lock = datasetLock.acquire()) { try(AutoCloseableLock lock = datasetLock.acquire()) {
f = getFile(bpid, blockId, false); info = volumeMap.get(bpid, blockId);
if (info != null) {
f = info.getBlockFile();
}
} }
if(f != null ) { if(f != null ) {
if(f.exists()) if(f.exists()) {
return f; return f;
}
// if file is not null, but doesn't exist - possibly disk failed // if file is not null, but doesn't exist - possibly disk failed
datanode.checkDiskErrorAsync(); datanode.checkDiskErrorAsync(info.getVolume());
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -79,7 +79,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* The underlying volume used to store replica. * The underlying volume used to store replica.
* *
* It uses the {@link FsDatasetImpl} object for synchronization. * It uses the {@link FsDatasetImpl} object for synchronization.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -98,7 +98,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
private final Map<String, BlockPoolSlice> bpSlices private final Map<String, BlockPoolSlice> bpSlices
= new ConcurrentHashMap<String, BlockPoolSlice>(); = new ConcurrentHashMap<String, BlockPoolSlice>();
private final File currentDir; // <StorageDirectory>/current private final File currentDir; // <StorageDirectory>/current
private final DF usage; private final DF usage;
private final long reserved; private final long reserved;
private CloseableReferenceCount reference = new CloseableReferenceCount(); private CloseableReferenceCount reference = new CloseableReferenceCount();
@ -120,7 +120,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
* contention. * contention.
*/ */
protected ThreadPoolExecutor cacheExecutor; protected ThreadPoolExecutor cacheExecutor;
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
Configuration conf, StorageType storageType) throws IOException { Configuration conf, StorageType storageType) throws IOException {
this.dataset = dataset; this.dataset = dataset;
@ -137,7 +137,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
this.configuredCapacity = -1; this.configuredCapacity = -1;
// dataset.datanode may be null in some tests. // dataset.datanode may be null in some tests.
this.fileIoProvider = dataset.datanode != null ? this.fileIoProvider = dataset.datanode != null ?
dataset.datanode.getFileIoProvider() : new FileIoProvider(conf); dataset.datanode.getFileIoProvider() :
new FileIoProvider(conf, dataset.datanode);
cacheExecutor = initializeCacheExecutor(parent); cacheExecutor = initializeCacheExecutor(parent);
this.metrics = DataNodeVolumeMetrics.create(conf, parent.getAbsolutePath()); this.metrics = DataNodeVolumeMetrics.create(conf, parent.getAbsolutePath());
} }
@ -288,7 +289,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
File getCurrentDir() { File getCurrentDir() {
return currentDir; return currentDir;
} }
File getRbwDir(String bpid) throws IOException { File getRbwDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getRbwDir(); return getBlockPoolSlice(bpid).getRbwDir();
} }
@ -358,11 +359,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
long getBlockPoolUsed(String bpid) throws IOException { long getBlockPoolUsed(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDfsUsed(); return getBlockPoolSlice(bpid).getDfsUsed();
} }
/** /**
* Return either the configured capacity of the file system if configured; or * Return either the configured capacity of the file system if configured; or
* the capacity of the file system excluding space reserved for non-HDFS. * the capacity of the file system excluding space reserved for non-HDFS.
* *
* @return the unreserved number of bytes left in this filesystem. May be * @return the unreserved number of bytes left in this filesystem. May be
* zero. * zero.
*/ */
@ -389,7 +390,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
/* /*
* Calculate the available space of the filesystem, excluding space reserved * Calculate the available space of the filesystem, excluding space reserved
* for non-HDFS and space reserved for RBW * for non-HDFS and space reserved for RBW
* *
* @return the available number of bytes left in this filesystem. May be zero. * @return the available number of bytes left in this filesystem. May be zero.
*/ */
@Override @Override
@ -460,7 +461,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
public String getBasePath() { public String getBasePath() {
return currentDir.getParent(); return currentDir.getParent();
} }
@Override @Override
public boolean isTransientStorage() { public boolean isTransientStorage() {
return storageType.isTransient(); return storageType.isTransient();
@ -481,9 +482,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/ */
@Override @Override
public String[] getBlockPoolList() { public String[] getBlockPoolList() {
return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]); return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
} }
/** /**
* Temporary files. They get moved to the finalized block directory when * Temporary files. They get moved to the finalized block directory when
* the block is finalized. * the block is finalized.
@ -692,7 +693,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
LOG.trace("getSubdirEntries({}, {}): no entries found in {}", LOG.trace("getSubdirEntries({}, {}): no entries found in {}",
storageID, bpid, dir.getAbsolutePath()); storageID, bpid, dir.getAbsolutePath());
} else { } else {
LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}", LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}",
storageID, bpid, entries.size(), dir.getAbsolutePath()); storageID, bpid, entries.size(), dir.getAbsolutePath());
} }
cache = entries; cache = entries;
@ -910,7 +911,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
} }
return VolumeCheckResult.HEALTHY; return VolumeCheckResult.HEALTHY;
} }
void getVolumeMap(ReplicaMap volumeMap, void getVolumeMap(ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap) final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException { throws IOException {
@ -918,7 +919,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
s.getVolumeMap(volumeMap, ramDiskReplicaMap); s.getVolumeMap(volumeMap, ramDiskReplicaMap);
} }
} }
void getVolumeMap(String bpid, ReplicaMap volumeMap, void getVolumeMap(String bpid, ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap) final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException { throws IOException {
@ -966,7 +967,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
} }
bpSlices.put(bpid, bp); bpSlices.put(bpid, bp);
} }
void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) { void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
BlockPoolSlice bp = bpSlices.get(bpid); BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) { if (bp != null) {
@ -992,7 +993,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
} }
return true; return true;
} }
void deleteBPDirectories(String bpid, boolean force) throws IOException { void deleteBPDirectories(String bpid, boolean force) throws IOException {
File volumeCurrentDir = this.getCurrentDir(); File volumeCurrentDir = this.getCurrentDir();
File bpDir = new File(volumeCurrentDir, bpid); File bpDir = new File(volumeCurrentDir, bpid);
@ -1000,7 +1001,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
// nothing to be deleted // nothing to be deleted
return; return;
} }
File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
File finalizedDir = new File(bpCurrentDir, File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED); DataStorage.STORAGE_DIR_FINALIZED);
@ -1049,12 +1050,12 @@ public class FsVolumeImpl implements FsVolumeSpi {
public String getStorageID() { public String getStorageID() {
return storageID; return storageID;
} }
@Override @Override
public StorageType getStorageType() { public StorageType getStorageType() {
return storageType; return storageType;
} }
DatanodeStorage toDatanodeStorage() { DatanodeStorage toDatanodeStorage() {
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType); return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
} }

View File

@ -589,7 +589,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
registerMBean(datanodeUuid); registerMBean(datanodeUuid);
this.fileIoProvider = new FileIoProvider(conf); this.fileIoProvider = new FileIoProvider(conf, datanode);
this.storage = new SimulatedStorage( this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));

View File

@ -942,7 +942,7 @@ public class TestDataNodeHotSwapVolumes {
DataNodeTestUtils.injectDataDirFailure(dirToFail); DataNodeTestUtils.injectDataDirFailure(dirToFail);
// Call and wait DataNode to detect disk failure. // Call and wait DataNode to detect disk failure.
long lastDiskErrorCheck = dn.getLastDiskErrorCheck(); long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
dn.checkDiskErrorAsync(); dn.checkDiskErrorAsync(failedVolume);
while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) { while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
Thread.sleep(100); Thread.sleep(100);
} }

View File

@ -17,7 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
@ -76,7 +76,7 @@ public class TestDataNodeVolumeFailureReporting {
final int WAIT_FOR_HEARTBEATS = 3000; final int WAIT_FOR_HEARTBEATS = 3000;
// Wait at least (2 * re-check + 10 * heartbeat) seconds for // Wait at least (2 * re-check + 10 * heartbeat) seconds for
// a datanode to be considered dead by the namenode. // a datanode to be considered dead by the namenode.
final int WAIT_FOR_DEATH = 15000; final int WAIT_FOR_DEATH = 15000;
@Before @Before
@ -158,7 +158,7 @@ public class TestDataNodeVolumeFailureReporting {
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH; assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
// Eventually the NN should report two volume failures // Eventually the NN should report two volume failures
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS); origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 2); checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
@ -185,7 +185,7 @@ public class TestDataNodeVolumeFailureReporting {
* did not grow or shrink the data volume while the test was running). * did not grow or shrink the data volume while the test was running).
*/ */
dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0); dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3, DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3,
origCapacity - (3*dnCapacity), WAIT_FOR_HEARTBEATS); origCapacity - (3*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 3); checkAggregateFailuresAtNameNode(true, 3);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
@ -211,7 +211,7 @@ public class TestDataNodeVolumeFailureReporting {
dn3Vol2.getAbsolutePath()); dn3Vol2.getAbsolutePath());
// The NN considers the DN dead // The NN considers the DN dead
DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2, DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2,
origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS); origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 2); checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
@ -236,7 +236,7 @@ public class TestDataNodeVolumeFailureReporting {
* and that the volume failure count should be reported as zero by * and that the volume failure count should be reported as zero by
* both the metrics and the NN. * both the metrics and the NN.
*/ */
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity, DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity,
WAIT_FOR_HEARTBEATS); WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 0); checkAggregateFailuresAtNameNode(true, 0);
dns = cluster.getDataNodes(); dns = cluster.getDataNodes();
@ -259,8 +259,8 @@ public class TestDataNodeVolumeFailureReporting {
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm); long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0); long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
// Fail the first volume on both datanodes (we have to keep the // Fail the first volume on both datanodes (we have to keep the
// third healthy so one node in the pipeline will not fail). // third healthy so one node in the pipeline will not fail).
File dn1Vol1 = new File(dataDir, "data"+(2*0+1)); File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
File dn2Vol1 = new File(dataDir, "data"+(2*1+1)); File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
DataNodeTestUtils.injectDataDirFailure(dn1Vol1, dn2Vol1); DataNodeTestUtils.injectDataDirFailure(dn1Vol1, dn2Vol1);
@ -271,7 +271,7 @@ public class TestDataNodeVolumeFailureReporting {
ArrayList<DataNode> dns = cluster.getDataNodes(); ArrayList<DataNode> dns = cluster.getDataNodes();
// The NN reports two volumes failures // The NN reports two volumes failures
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS); origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 2); checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
@ -318,6 +318,12 @@ public class TestDataNodeVolumeFailureReporting {
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L); DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file1, (short)3); DFSTestUtil.waitReplication(fs, file1, (short)3);
// Create additional file to trigger failure based volume check on dn1Vol2
// and dn2Vol2.
Path file2 = new Path("/test2");
DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file2, (short)3);
ArrayList<DataNode> dns = cluster.getDataNodes(); ArrayList<DataNode> dns = cluster.getDataNodes();
assertTrue("DN1 should be up", dns.get(0).isDatanodeUp()); assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
assertTrue("DN2 should be up", dns.get(1).isDatanodeUp()); assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
@ -536,8 +542,6 @@ public class TestDataNodeVolumeFailureReporting {
private void checkFailuresAtDataNode(DataNode dn, private void checkFailuresAtDataNode(DataNode dn,
long expectedVolumeFailuresCounter, boolean expectCapacityKnown, long expectedVolumeFailuresCounter, boolean expectCapacityKnown,
String... expectedFailedVolumes) throws Exception { String... expectedFailedVolumes) throws Exception {
assertCounter("VolumeFailures", expectedVolumeFailuresCounter,
getMetrics(dn.getMetrics().name()));
FsDatasetSpi<?> fsd = dn.getFSDataset(); FsDatasetSpi<?> fsd = dn.getFSDataset();
assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes()); assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations()); assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations());

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.checker; package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -103,24 +104,28 @@ public class TestDatasetVolumeChecker {
/** /**
* Request a check and ensure it triggered {@link FsVolumeSpi#check}. * Request a check and ensure it triggered {@link FsVolumeSpi#check}.
*/ */
checker.checkVolume(volume, new DatasetVolumeChecker.Callback() { boolean result =
@Override checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
public void call(Set<FsVolumeSpi> healthyVolumes, @Override
Set<FsVolumeSpi> failedVolumes) { public void call(Set<FsVolumeSpi> healthyVolumes,
numCallbackInvocations.incrementAndGet(); Set<FsVolumeSpi> failedVolumes) {
if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) { numCallbackInvocations.incrementAndGet();
assertThat(healthyVolumes.size(), is(1)); if (expectedVolumeHealth != null &&
assertThat(failedVolumes.size(), is(0)); expectedVolumeHealth != FAILED) {
} else { assertThat(healthyVolumes.size(), is(1));
assertThat(healthyVolumes.size(), is(0)); assertThat(failedVolumes.size(), is(0));
assertThat(failedVolumes.size(), is(1)); } else {
} assertThat(healthyVolumes.size(), is(0));
} assertThat(failedVolumes.size(), is(1));
}); }
}
});
// Ensure that the check was invoked at least once. // Ensure that the check was invoked at least once.
verify(volume, times(1)).check(any(VolumeCheckContext.class)); verify(volume, times(1)).check(any(VolumeCheckContext.class));
assertThat(numCallbackInvocations.get(), is(1L)); if (result) {
assertThat(numCallbackInvocations.get(), is(1L));
}
} }
/** /**
@ -172,7 +177,7 @@ public class TestDatasetVolumeChecker {
checker.setDelegateChecker(new DummyChecker()); checker.setDelegateChecker(new DummyChecker());
final AtomicLong numCallbackInvocations = new AtomicLong(0); final AtomicLong numCallbackInvocations = new AtomicLong(0);
checker.checkAllVolumesAsync( boolean result = checker.checkAllVolumesAsync(
dataset, new DatasetVolumeChecker.Callback() { dataset, new DatasetVolumeChecker.Callback() {
@Override @Override
public void call( public void call(
@ -192,7 +197,9 @@ public class TestDatasetVolumeChecker {
}); });
// The callback should be invoked exactly once. // The callback should be invoked exactly once.
assertThat(numCallbackInvocations.get(), is(1L)); if (result) {
assertThat(numCallbackInvocations.get(), is(1L));
}
// Ensure each volume's check() method was called exactly once. // Ensure each volume's check() method was called exactly once.
for (FsVolumeSpi volume : volumes) { for (FsVolumeSpi volume : volumes) {
@ -206,15 +213,18 @@ public class TestDatasetVolumeChecker {
*/ */
static class DummyChecker static class DummyChecker
implements AsyncChecker<VolumeCheckContext, VolumeCheckResult> { implements AsyncChecker<VolumeCheckContext, VolumeCheckResult> {
@Override @Override
public ListenableFuture<VolumeCheckResult> schedule( public Optional<ListenableFuture<VolumeCheckResult>> schedule(
Checkable<VolumeCheckContext, VolumeCheckResult> target, Checkable<VolumeCheckContext, VolumeCheckResult> target,
VolumeCheckContext context) { VolumeCheckContext context) {
try { try {
return Futures.immediateFuture(target.check(context)); return Optional.of(
Futures.immediateFuture(target.check(context)));
} catch (Exception e) { } catch (Exception e) {
LOG.info("check routine threw exception " + e); LOG.info("check routine threw exception " + e);
return Futures.immediateFailedFuture(e); return Optional.of(
Futures.<VolumeCheckResult>immediateFailedFuture(e));
} }
} }
@ -259,4 +269,4 @@ public class TestDatasetVolumeChecker {
} }
return volumes; return volumes;
} }
} }

View File

@ -131,29 +131,6 @@ public class TestDatasetVolumeCheckerFailures {
assertThat(checker.getNumSkippedChecks(), is(1L)); assertThat(checker.getNumSkippedChecks(), is(1L));
} }
@Test(timeout=60000)
public void testMinGapIsEnforcedForASyncChecks() throws Exception {
final List<FsVolumeSpi> volumes =
TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
final FsDatasetSpi<FsVolumeSpi> dataset =
TestDatasetVolumeChecker.makeDataset(volumes);
final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
checker.checkAllVolumesAsync(dataset, null);
assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
// Re-check without advancing the timer. Ensure the check is skipped.
checker.checkAllVolumesAsync(dataset, null);
assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
assertThat(checker.getNumSkippedChecks(), is(1L));
// Re-check after advancing the timer. Ensure the check is performed.
timer.advance(MIN_DISK_CHECK_GAP_MS);
checker.checkAllVolumesAsync(dataset, null);
assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
assertThat(checker.getNumSkippedChecks(), is(1L));
}
/** /**
* Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine * Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine
* hangs forever. * hangs forever.

View File

@ -18,15 +18,14 @@
package org.apache.hadoop.hdfs.server.datanode.checker; package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.base.Optional;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.FakeTimer;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,10 +37,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.Is.isA;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -53,9 +49,6 @@ public class TestThrottledAsyncChecker {
LoggerFactory.getLogger(TestThrottledAsyncChecker.class); LoggerFactory.getLogger(TestThrottledAsyncChecker.class);
private static final long MIN_ERROR_CHECK_GAP = 1000; private static final long MIN_ERROR_CHECK_GAP = 1000;
@Rule
public ExpectedException thrown = ExpectedException.none();
/** /**
* Test various scheduling combinations to ensure scheduling and * Test various scheduling combinations to ensure scheduling and
* throttling behave as expected. * throttling behave as expected.
@ -70,34 +63,34 @@ public class TestThrottledAsyncChecker {
getExecutorService()); getExecutorService());
// check target1 and ensure we get back the expected result. // check target1 and ensure we get back the expected result.
assertTrue(checker.schedule(target1, true).get()); assertTrue(checker.schedule(target1, true).isPresent());
assertThat(target1.numChecks.get(), is(1L)); waitTestCheckableCheckCount(target1, 1L);
// Check target1 again without advancing the timer. target1 should not // Check target1 again without advancing the timer. target1 should not
// be checked again and the cached result should be returned. // be checked again.
assertTrue(checker.schedule(target1, true).get()); assertFalse(checker.schedule(target1, true).isPresent());
assertThat(target1.numChecks.get(), is(1L)); waitTestCheckableCheckCount(target1, 1L);
// Schedule target2 scheduled without advancing the timer. // Schedule target2 scheduled without advancing the timer.
// target2 should be checked as it has never been checked before. // target2 should be checked as it has never been checked before.
assertTrue(checker.schedule(target2, true).get()); assertTrue(checker.schedule(target2, true).isPresent());
assertThat(target2.numChecks.get(), is(1L)); waitTestCheckableCheckCount(target2, 1L);
// Advance the timer but just short of the min gap. // Advance the timer but just short of the min gap.
// Neither target1 nor target2 should be checked again. // Neither target1 nor target2 should be checked again.
timer.advance(MIN_ERROR_CHECK_GAP - 1); timer.advance(MIN_ERROR_CHECK_GAP - 1);
assertTrue(checker.schedule(target1, true).get()); assertFalse(checker.schedule(target1, true).isPresent());
assertThat(target1.numChecks.get(), is(1L)); waitTestCheckableCheckCount(target1, 1L);
assertTrue(checker.schedule(target2, true).get()); assertFalse(checker.schedule(target2, true).isPresent());
assertThat(target2.numChecks.get(), is(1L)); waitTestCheckableCheckCount(target2, 1L);
// Advance the timer again. // Advance the timer again.
// Both targets should be checked now. // Both targets should be checked now.
timer.advance(MIN_ERROR_CHECK_GAP); timer.advance(MIN_ERROR_CHECK_GAP);
assertTrue(checker.schedule(target1, true).get()); assertTrue(checker.schedule(target1, true).isPresent());
assertThat(target1.numChecks.get(), is(2L)); waitTestCheckableCheckCount(target1, 2L);
assertTrue(checker.schedule(target2, true).get()); assertTrue(checker.schedule(target2, true).isPresent());
assertThat(target1.numChecks.get(), is(2L)); waitTestCheckableCheckCount(target2, 2L);
} }
@Test (timeout=60000) @Test (timeout=60000)
@ -109,13 +102,16 @@ public class TestThrottledAsyncChecker {
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService()); getExecutorService());
ListenableFuture<Boolean> lf = checker.schedule(target, true); Optional<ListenableFuture<Boolean>> olf =
Futures.addCallback(lf, callback); checker.schedule(target, true);
if (olf.isPresent()) {
Futures.addCallback(olf.get(), callback);
}
// Request immediate cancellation. // Request immediate cancellation.
checker.shutdownAndWait(0, TimeUnit.MILLISECONDS); checker.shutdownAndWait(0, TimeUnit.MILLISECONDS);
try { try {
assertFalse(lf.get()); assertFalse(olf.get().get());
fail("Failed to get expected InterruptedException"); fail("Failed to get expected InterruptedException");
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
assertTrue(ee.getCause() instanceof InterruptedException); assertTrue(ee.getCause() instanceof InterruptedException);
@ -130,27 +126,33 @@ public class TestThrottledAsyncChecker {
final ThrottledAsyncChecker<Boolean, Boolean> checker = final ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService()); getExecutorService());
final ListenableFuture<Boolean> lf1 = checker.schedule(target, true); final Optional<ListenableFuture<Boolean>> olf1 =
final ListenableFuture<Boolean> lf2 = checker.schedule(target, true); checker.schedule(target, true);
// Ensure that concurrent requests return the same future object. final Optional<ListenableFuture<Boolean>> olf2 =
assertTrue(lf1 == lf2); checker.schedule(target, true);
// Ensure that concurrent requests return the future object
// for the first caller.
assertTrue(olf1.isPresent());
assertFalse(olf2.isPresent());
// Unblock the latch and wait for it to finish execution. // Unblock the latch and wait for it to finish execution.
target.latch.countDown(); target.latch.countDown();
lf1.get(); olf1.get().get();
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
// We should not get back the same future as before. // We should get an absent Optional.
// This can take a short while until the internal callback in // This can take a short while until the internal callback in
// ThrottledAsyncChecker is scheduled for execution. // ThrottledAsyncChecker is scheduled for execution.
// Also this should not trigger a new check operation as the timer // Also this should not trigger a new check operation as the timer
// was not advanced. If it does trigger a new check then the test // was not advanced. If it does trigger a new check then the test
// will fail with a timeout. // will fail with a timeout.
final ListenableFuture<Boolean> lf3 = checker.schedule(target, true); final Optional<ListenableFuture<Boolean>> olf3 =
return lf3 != lf2; checker.schedule(target, true);
return !olf3.isPresent();
} }
}, 100, 10000); }, 100, 10000);
} }
@ -168,15 +170,30 @@ public class TestThrottledAsyncChecker {
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService()); getExecutorService());
assertTrue(checker.schedule(target1, true).get()); assertTrue(checker.schedule(target1, true).isPresent());
assertThat(target1.numChecks.get(), is(1L)); waitTestCheckableCheckCount(target1, 1L);
timer.advance(MIN_ERROR_CHECK_GAP + 1); timer.advance(MIN_ERROR_CHECK_GAP + 1);
assertFalse(checker.schedule(target1, false).get()); assertTrue(checker.schedule(target1, false).isPresent());
assertThat(target1.numChecks.get(), is(2L)); waitTestCheckableCheckCount(target1, 2L);
} }
private void waitTestCheckableCheckCount(
final TestCheckableBase target,
final long expectedChecks) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
// This can take a short while until the internal callback in
// ThrottledAsyncChecker is scheduled for execution.
// If it does trigger a new check then the test
// will fail with a timeout.
return target.getTotalChecks() == expectedChecks;
}
}, 100, 10000);
}
/** /**
* Ensure that the exeption from a failed check is cached * Ensure that the exception from a failed check is cached
* and returned without re-running the check when the minimum * and returned without re-running the check when the minimum
* gap has not elapsed. * gap has not elapsed.
* *
@ -190,13 +207,11 @@ public class TestThrottledAsyncChecker {
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService()); getExecutorService());
thrown.expectCause(isA(DummyException.class)); assertTrue(checker.schedule(target1, true).isPresent());
checker.schedule(target1, true).get(); waitTestCheckableCheckCount(target1, 1L);
assertThat(target1.numChecks.get(), is(1L));
thrown.expectCause(isA(DummyException.class)); assertFalse(checker.schedule(target1, true).isPresent());
checker.schedule(target1, true).get(); waitTestCheckableCheckCount(target1, 1L);
assertThat(target1.numChecks.get(), is(2L));
} }
/** /**
@ -206,28 +221,38 @@ public class TestThrottledAsyncChecker {
return new ScheduledThreadPoolExecutor(1); return new ScheduledThreadPoolExecutor(1);
} }
private abstract static class TestCheckableBase
implements Checkable<Boolean, Boolean> {
protected final AtomicLong numChecks = new AtomicLong(0);
public long getTotalChecks() {
return numChecks.get();
}
public void incrTotalChecks() {
numChecks.incrementAndGet();
}
}
/** /**
* A Checkable that just returns its input. * A Checkable that just returns its input.
*/ */
private static class NoOpCheckable private static class NoOpCheckable
implements Checkable<Boolean, Boolean> { extends TestCheckableBase {
private final AtomicLong numChecks = new AtomicLong(0);
@Override @Override
public Boolean check(Boolean context) { public Boolean check(Boolean context) {
numChecks.incrementAndGet(); incrTotalChecks();
return context; return context;
} }
} }
private static class ThrowingCheckable private static class ThrowingCheckable
implements Checkable<Boolean, Boolean> { extends TestCheckableBase {
private final AtomicLong numChecks = new AtomicLong(0);
@Override @Override
public Boolean check(Boolean context) throws DummyException { public Boolean check(Boolean context) throws DummyException {
numChecks.incrementAndGet(); incrTotalChecks();
throw new DummyException(); throw new DummyException();
} }
} }
private static class DummyException extends Exception { private static class DummyException extends Exception {

View File

@ -159,7 +159,7 @@ public class TestFsDatasetImpl {
this.conf = new Configuration(); this.conf = new Configuration();
this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
final FileIoProvider fileIoProvider = new FileIoProvider(conf); final FileIoProvider fileIoProvider = new FileIoProvider(conf, null);
when(datanode.getFileIoProvider()).thenReturn(fileIoProvider); when(datanode.getFileIoProvider()).thenReturn(fileIoProvider);
when(datanode.getConf()).thenReturn(conf); when(datanode.getConf()).thenReturn(conf);
final DNConf dnConf = new DNConf(datanode); final DNConf dnConf = new DNConf(datanode);