From eafaddca1a69dd02c6e72f0b21b66f58b0c18dab Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Wed, 28 Dec 2016 22:08:13 -0800 Subject: [PATCH] HDFS-11274. Datanode should only check the failed volume upon IO errors. Contributed by Xiaoyu Yao. --- .../hdfs/server/datanode/BlockReceiver.java | 12 +- .../server/datanode/CountingFileIoEvents.java | 3 +- .../hadoop/hdfs/server/datanode/DataNode.java | 97 +++++++++---- .../server/datanode/DefaultFileIoEvents.java | 2 +- .../server/datanode/DirectoryScanner.java | 2 +- .../hdfs/server/datanode/FileIoEvents.java | 36 +++-- .../hdfs/server/datanode/FileIoProvider.java | 89 ++++++------ .../datanode/ProfilingFileIoEvents.java | 2 +- .../hdfs/server/datanode/ReplicaInfo.java | 2 +- .../server/datanode/checker/AsyncChecker.java | 5 +- .../checker/DatasetVolumeChecker.java | 72 +++++++--- .../checker/StorageLocationChecker.java | 8 +- .../checker/ThrottledAsyncChecker.java | 19 ++- .../fsdataset/impl/BlockPoolSlice.java | 5 +- .../fsdataset/impl/FsDatasetImpl.java | 13 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 39 +++--- .../server/datanode/SimulatedFSDataset.java | 2 +- .../datanode/TestDataNodeHotSwapVolumes.java | 2 +- .../TestDataNodeVolumeFailureReporting.java | 26 ++-- .../checker/TestDatasetVolumeChecker.java | 52 ++++--- .../TestDatasetVolumeCheckerFailures.java | 23 ---- .../checker/TestThrottledAsyncChecker.java | 129 +++++++++++------- .../fsdataset/impl/TestFsDatasetImpl.java | 2 +- 23 files changed, 377 insertions(+), 265 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 7fdffb4b7e3..517a70958af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -276,10 +276,9 @@ class BlockReceiver implements Closeable { IOException cause = DatanodeUtil.getCauseIfDiskError(ioe); DataNode.LOG.warn("IOException in BlockReceiver constructor" + (cause == null ? "" : ". Cause is "), cause); - - if (cause != null) { // possible disk error + if (cause != null) { ioe = cause; - datanode.checkDiskErrorAsync(); + // Volume error check moved to FileIoProvider } throw ioe; @@ -361,9 +360,8 @@ class BlockReceiver implements Closeable { if (measuredFlushTime) { datanode.metrics.addFlushNanos(flushTotalNanos); } - // disk check if(ioe != null) { - datanode.checkDiskErrorAsync(); + // Volume error check moved to FileIoProvider throw ioe; } } @@ -786,7 +784,7 @@ class BlockReceiver implements Closeable { manageWriterOsCache(offsetInBlock); } } catch (IOException iex) { - datanode.checkDiskErrorAsync(); + // Volume error check moved to FileIoProvider throw iex; } } @@ -1395,7 +1393,7 @@ class BlockReceiver implements Closeable { } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { - datanode.checkDiskErrorAsync(); + // Volume error check moved to FileIoProvider LOG.info(myString, e); running = false; if (!Thread.interrupted()) { // failure not caused by interruption diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java index a70c151b7a4..7c6bfd6332d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class CountingFileIoEvents implements FileIoEvents { +public class CountingFileIoEvents extends FileIoEvents { private final Map counts; private static class Counts { @@ -90,7 +90,6 @@ public class CountingFileIoEvents implements FileIoEvents { public void onFailure( @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) { counts.get(op).failures.incrementAndGet(); - } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index bddde1fdfa7..dc7d2675170 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -371,6 +371,7 @@ public class DataNode extends ReconfigurableBase SaslDataTransferServer saslServer; private final boolean getHdfsBlockLocationsEnabled; private ObjectName dataNodeInfoBeanName; + // Test verification only private volatile long lastDiskErrorCheck; private String supergroup; private boolean isPermissionEnabled; @@ -408,7 +409,7 @@ public class DataNode extends ReconfigurableBase this.tracer = createTracer(conf); this.tracerConfigurationManager = new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); - this.fileIoProvider = new FileIoProvider(conf); + this.fileIoProvider = new FileIoProvider(conf, this); this.fileDescriptorPassingDisabledReason = null; this.maxNumberOfBlocksToLog = 0; this.confVersion = null; @@ -433,7 +434,7 @@ public class DataNode extends ReconfigurableBase this.tracer = createTracer(conf); this.tracerConfigurationManager = new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); - this.fileIoProvider = new FileIoProvider(conf); + this.fileIoProvider = new FileIoProvider(conf, this); this.blockScanner = new BlockScanner(this); this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, @@ -783,7 +784,7 @@ public class DataNode extends ReconfigurableBase /** * Remove volumes from DataNode. - * See {@link #removeVolumes(Set, boolean)} for details. + * See {@link #removeVolumes(Collection, boolean)} for details. * * @param locations the StorageLocations of the volumes to be removed. * @throws IOException @@ -810,7 +811,7 @@ public class DataNode extends ReconfigurableBase * * - * @param absoluteVolumePaths the absolute path of volumes. + * @param storageLocations the absolute path of volumes. * @param clearFailure if true, clears the failure information related to the * volumes. * @throws IOException @@ -1258,7 +1259,7 @@ public class DataNode extends ReconfigurableBase * If conf's CONFIG_PROPERTY_SIMULATED property is set * then a simulated storage based data node is created. * - * @param dataDirs - only for a non-simulated storage data node + * @param dataDirectories - only for a non-simulated storage data node * @throws IOException */ void startDataNode(List dataDirectories, @@ -2020,13 +2021,36 @@ public class DataNode extends ReconfigurableBase tracer.close(); } - /** - * Check if there is a disk failure asynchronously and if so, handle the error + * Check if there is a disk failure asynchronously + * and if so, handle the error. */ + @VisibleForTesting public void checkDiskErrorAsync() { volumeChecker.checkAllVolumesAsync( data, new DatasetVolumeChecker.Callback() { + @Override + public void call(Set healthyVolumes, + Set failedVolumes) { + if (failedVolumes.size() > 0) { + LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}", + failedVolumes.size(), failedVolumes); + } else { + LOG.debug("checkDiskErrorAsync: no volume failures detected"); + } + lastDiskErrorCheck = Time.monotonicNow(); + handleVolumeFailures(failedVolumes); + } + }); + } + + /** + * Check if there is a disk failure asynchronously + * and if so, handle the error. + */ + public void checkDiskErrorAsync(FsVolumeSpi volume) { + volumeChecker.checkVolume( + volume, new DatasetVolumeChecker.Callback() { @Override public void call(Set healthyVolumes, Set failedVolumes) { @@ -2037,14 +2061,15 @@ public class DataNode extends ReconfigurableBase LOG.debug("checkDiskErrorAsync: no volume failures detected"); } lastDiskErrorCheck = Time.monotonicNow(); - DataNode.this.handleVolumeFailures(failedVolumes); + handleVolumeFailures(failedVolumes); } }); } - private void handleDiskError(String errMsgr) { + private void handleDiskError(String failedVolumes) { final boolean hasEnoughResources = data.hasEnoughResource(); - LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources); + LOG.warn("DataNode.handleDiskError on : [" + failedVolumes + + "] Keep Running: " + hasEnoughResources); // If we have enough active valid volumes then we do not want to // shutdown the DN completely. @@ -2054,7 +2079,7 @@ public class DataNode extends ReconfigurableBase //inform NameNodes for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) { - bpos.trySendErrorReport(dpError, errMsgr); + bpos.trySendErrorReport(dpError, failedVolumes); } if(hasEnoughResources) { @@ -2062,7 +2087,8 @@ public class DataNode extends ReconfigurableBase return; // do not shutdown } - LOG.warn("DataNode is shutting down: " + errMsgr); + LOG.warn("DataNode is shutting down due to failed volumes: [" + + failedVolumes + "]"); shouldRun = false; } @@ -2412,8 +2438,11 @@ public class DataNode extends ReconfigurableBase } LOG.warn(bpReg + ":Failed to transfer " + b + " to " + targets[0] + " got ", ie); - // check if there are any disk problem - checkDiskErrorAsync(); + // disk check moved to FileIoProvider + IOException cause = DatanodeUtil.getCauseIfDiskError(ie); + if (cause != null) { // possible disk error + LOG.warn("IOException in DataTransfer#run(). Cause is ", cause); + } } finally { xmitsInProgress.getAndDecrement(); IOUtils.closeStream(blockSender); @@ -3167,28 +3196,36 @@ public class DataNode extends ReconfigurableBase } private void handleVolumeFailures(Set unhealthyVolumes) { + if (unhealthyVolumes.isEmpty()) { + LOG.debug("handleVolumeFailures done with empty " + + "unhealthyVolumes"); + return; + } + data.handleVolumeFailures(unhealthyVolumes); final Set unhealthyDirs = new HashSet<>(unhealthyVolumes.size()); - if (!unhealthyVolumes.isEmpty()) { - StringBuilder sb = new StringBuilder("DataNode failed volumes:"); - for (FsVolumeSpi vol : unhealthyVolumes) { - unhealthyDirs.add(new File(vol.getBasePath()).getAbsoluteFile()); - sb.append(vol).append(";"); - } - - try { - // Remove all unhealthy volumes from DataNode. - removeVolumes(unhealthyDirs, false); - } catch (IOException e) { - LOG.warn("Error occurred when removing unhealthy storage dirs: " - + e.getMessage(), e); - } - LOG.info(sb.toString()); - handleDiskError(sb.toString()); + StringBuilder sb = new StringBuilder("DataNode failed volumes:"); + for (FsVolumeSpi vol : unhealthyVolumes) { + unhealthyDirs.add(new File(vol.getBasePath()).getAbsoluteFile()); + sb.append(vol).append(";"); } + + try { + // Remove all unhealthy volumes from DataNode. + removeVolumes(unhealthyDirs, false); + } catch (IOException e) { + LOG.warn("Error occurred when removing unhealthy storage dirs: " + + e.getMessage(), e); + } + if (LOG.isDebugEnabled()) { + LOG.debug(sb.toString()); + } + // send blockreport regarding volume failure + handleDiskError(sb.toString()); } + @VisibleForTesting public long getLastDiskErrorCheck() { return lastDiskErrorCheck; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java index bd4932ba1f9..6a12aae4338 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java @@ -31,7 +31,7 @@ import javax.annotation.Nullable; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public final class DefaultFileIoEvents implements FileIoEvents { +public final class DefaultFileIoEvents extends FileIoEvents { @Override public long beforeMetadataOp( @Nullable FsVolumeSpi volume, OPERATION op) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index d9a6749f1fa..15208c4ffb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -855,7 +855,7 @@ public class DirectoryScanner implements Runnable { } catch (IOException ioe) { LOG.warn("Exception occured while compiling report: ", ioe); // Initiate a check on disk failure. - datanode.checkDiskErrorAsync(); + datanode.checkDiskErrorAsync(volume); // Ignore this directory and proceed. return report; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java index 48e703f73be..10f2a0c4881 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java @@ -32,7 +32,7 @@ import javax.annotation.Nullable; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public interface FileIoEvents { +public abstract class FileIoEvents { /** * Invoked before a filesystem metadata operation. @@ -42,7 +42,7 @@ public interface FileIoEvents { * @return timestamp at which the operation was started. 0 if * unavailable. */ - long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op); + abstract long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op); /** * Invoked after a filesystem metadata operation has completed. @@ -52,7 +52,8 @@ public interface FileIoEvents { * @param begin timestamp at which the operation was started. 0 * if unavailable. */ - void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin); + abstract void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, + long begin); /** * Invoked before a read/write/flush/channel transfer operation. @@ -63,7 +64,8 @@ public interface FileIoEvents { * @return timestamp at which the operation was started. 0 if * unavailable. */ - long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len); + abstract long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, + long len); /** @@ -76,22 +78,38 @@ public interface FileIoEvents { * @return timestamp at which the operation was started. 0 if * unavailable. */ - void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op, - long begin, long len); + abstract void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op, + long begin, long len); /** * Invoked if an operation fails with an exception. - * @param volume target volume for the operation. Null if unavailable. + * @param volume target volume for the operation. Null if unavailable. * @param op type of operation. * @param e Exception encountered during the operation. * @param begin time at which the operation was started. */ - void onFailure( + abstract void onFailure( @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin); + /** + * Invoked by FileIoProvider if an operation fails with an exception. + * @param datanode datanode that runs volume check upon volume io failure + * @param volume target volume for the operation. Null if unavailable. + * @param op type of operation. + * @param e Exception encountered during the operation. + * @param begin time at which the operation was started. + */ + void onFailure(DataNode datanode, + @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) { + onFailure(volume, op, e, begin); + if (datanode != null && volume != null) { + datanode.checkDiskErrorAsync(volume); + } + } + /** * Return statistics as a JSON string. * @return */ - @Nullable String getStatistics(); + @Nullable abstract String getStatistics(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java index df7718e56a5..4a711794eee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java @@ -79,12 +79,16 @@ public class FileIoProvider { FileIoProvider.class); private final FileIoEvents eventHooks; + private final DataNode datanode; /** * @param conf Configuration object. May be null. When null, * the event handlers are no-ops. + * @param datanode datanode that owns this FileIoProvider. Used for + * IO error based volume checker callback */ - public FileIoProvider(@Nullable Configuration conf) { + public FileIoProvider(@Nullable Configuration conf, + final DataNode datanode) { if (conf != null) { final Class clazz = conf.getClass( DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY, @@ -94,6 +98,7 @@ public class FileIoProvider { } else { eventHooks = new DefaultFileIoEvents(); } + this.datanode = datanode; } /** @@ -139,7 +144,7 @@ public class FileIoProvider { f.flush(); eventHooks.afterFileIo(volume, FLUSH, begin, 0); } catch (Exception e) { - eventHooks.onFailure(volume, FLUSH, e, begin); + eventHooks.onFailure(datanode, volume, FLUSH, e, begin); throw e; } } @@ -157,7 +162,7 @@ public class FileIoProvider { fos.getChannel().force(true); eventHooks.afterFileIo(volume, SYNC, begin, 0); } catch (Exception e) { - eventHooks.onFailure(volume, SYNC, e, begin); + eventHooks.onFailure(datanode, volume, SYNC, e, begin); throw e; } } @@ -176,7 +181,7 @@ public class FileIoProvider { NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags); eventHooks.afterFileIo(volume, SYNC, begin, 0); } catch (Exception e) { - eventHooks.onFailure(volume, SYNC, e, begin); + eventHooks.onFailure(datanode, volume, SYNC, e, begin); throw e; } } @@ -196,7 +201,7 @@ public class FileIoProvider { identifier, outFd, offset, length, flags); eventHooks.afterMetadataOp(volume, FADVISE, begin); } catch (Exception e) { - eventHooks.onFailure(volume, FADVISE, e, begin); + eventHooks.onFailure(datanode, volume, FADVISE, e, begin); throw e; } } @@ -214,7 +219,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, DELETE, begin); return deleted; } catch (Exception e) { - eventHooks.onFailure(volume, DELETE, e, begin); + eventHooks.onFailure(datanode, volume, DELETE, e, begin); throw e; } } @@ -236,7 +241,7 @@ public class FileIoProvider { } return deleted; } catch (Exception e) { - eventHooks.onFailure(volume, DELETE, e, begin); + eventHooks.onFailure(datanode, volume, DELETE, e, begin); throw e; } } @@ -264,7 +269,7 @@ public class FileIoProvider { waitTime, transferTime); eventHooks.afterFileIo(volume, TRANSFER, begin, count); } catch (Exception e) { - eventHooks.onFailure(volume, TRANSFER, e, begin); + eventHooks.onFailure(datanode, volume, TRANSFER, e, begin); throw e; } } @@ -285,7 +290,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, OPEN, begin); return created; } catch (Exception e) { - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -312,7 +317,7 @@ public class FileIoProvider { return fis; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(fis); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -328,7 +333,7 @@ public class FileIoProvider { * @param f File object. * @param append if true, then bytes will be written to the end of the * file rather than the beginning. - * @param FileOutputStream to the given file object. + * @return FileOutputStream to the given file object. * @throws FileNotFoundException */ public FileOutputStream getFileOutputStream( @@ -342,7 +347,7 @@ public class FileIoProvider { return fos; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(fos); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -372,7 +377,7 @@ public class FileIoProvider { * before delegating to the wrapped stream. * * @param volume target volume. null if unavailable. - * @param f File object. + * @param fd File descriptor object. * @return FileOutputStream to the given file object. * @throws FileNotFoundException */ @@ -407,7 +412,7 @@ public class FileIoProvider { return fis; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(fis); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -438,7 +443,7 @@ public class FileIoProvider { return fis; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(fis); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -468,7 +473,7 @@ public class FileIoProvider { return raf; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(raf); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -487,7 +492,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, DELETE, begin); return deleted; } catch(Exception e) { - eventHooks.onFailure(volume, DELETE, e, begin); + eventHooks.onFailure(datanode, volume, DELETE, e, begin); throw e; } } @@ -508,7 +513,7 @@ public class FileIoProvider { FileUtil.replaceFile(src, target); eventHooks.afterMetadataOp(volume, MOVE, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MOVE, e, begin); + eventHooks.onFailure(datanode, volume, MOVE, e, begin); throw e; } } @@ -530,7 +535,7 @@ public class FileIoProvider { Storage.rename(src, target); eventHooks.afterMetadataOp(volume, MOVE, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MOVE, e, begin); + eventHooks.onFailure(datanode, volume, MOVE, e, begin); throw e; } } @@ -552,7 +557,7 @@ public class FileIoProvider { FileUtils.moveFile(src, target); eventHooks.afterMetadataOp(volume, MOVE, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MOVE, e, begin); + eventHooks.onFailure(datanode, volume, MOVE, e, begin); throw e; } } @@ -576,7 +581,7 @@ public class FileIoProvider { Files.move(src, target, options); eventHooks.afterMetadataOp(volume, MOVE, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MOVE, e, begin); + eventHooks.onFailure(datanode, volume, MOVE, e, begin); throw e; } } @@ -625,7 +630,7 @@ public class FileIoProvider { Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate); eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length); } catch(Exception e) { - eventHooks.onFailure(volume, NATIVE_COPY, e, begin); + eventHooks.onFailure(datanode, volume, NATIVE_COPY, e, begin); throw e; } } @@ -650,7 +655,7 @@ public class FileIoProvider { isDirectory = !created && dir.isDirectory(); eventHooks.afterMetadataOp(volume, MKDIRS, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MKDIRS, e, begin); + eventHooks.onFailure(datanode, volume, MKDIRS, e, begin); throw e; } @@ -676,7 +681,7 @@ public class FileIoProvider { succeeded = dir.isDirectory() || dir.mkdirs(); eventHooks.afterMetadataOp(volume, MKDIRS, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MKDIRS, e, begin); + eventHooks.onFailure(datanode, volume, MKDIRS, e, begin); throw e; } @@ -702,7 +707,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, LIST, begin); return children; } catch(Exception e) { - eventHooks.onFailure(volume, LIST, e, begin); + eventHooks.onFailure(datanode, volume, LIST, e, begin); throw e; } } @@ -712,7 +717,7 @@ public class FileIoProvider { * {@link FileUtil#listFiles(File)}. * * @param volume target volume. null if unavailable. - * @param Driectory to be listed. + * @param dir directory to be listed. * @return array of strings representing the directory entries. * @throws IOException */ @@ -724,7 +729,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, LIST, begin); return children; } catch(Exception e) { - eventHooks.onFailure(volume, LIST, e, begin); + eventHooks.onFailure(datanode, volume, LIST, e, begin); throw e; } } @@ -747,7 +752,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, LIST, begin); return children; } catch(Exception e) { - eventHooks.onFailure(volume, LIST, e, begin); + eventHooks.onFailure(datanode, volume, LIST, e, begin); throw e; } } @@ -769,7 +774,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, LIST, begin); return count; } catch(Exception e) { - eventHooks.onFailure(volume, LIST, e, begin); + eventHooks.onFailure(datanode, volume, LIST, e, begin); throw e; } } @@ -788,7 +793,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, EXISTS, begin); return exists; } catch(Exception e) { - eventHooks.onFailure(volume, EXISTS, e, begin); + eventHooks.onFailure(datanode, volume, EXISTS, e, begin); throw e; } } @@ -829,7 +834,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, 1); return b; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -845,7 +850,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -861,7 +866,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -903,7 +908,7 @@ public class FileIoProvider { super.write(b); eventHooks.afterFileIo(volume, WRITE, begin, 1); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -918,7 +923,7 @@ public class FileIoProvider { super.write(b); eventHooks.afterFileIo(volume, WRITE, begin, b.length); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -933,7 +938,7 @@ public class FileIoProvider { super.write(b, off, len); eventHooks.afterFileIo(volume, WRITE, begin, len); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -961,7 +966,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, 1); return b; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -974,7 +979,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -987,7 +992,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -999,7 +1004,7 @@ public class FileIoProvider { super.write(b); eventHooks.afterFileIo(volume, WRITE, begin, 1); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -1011,7 +1016,7 @@ public class FileIoProvider { super.write(b); eventHooks.afterFileIo(volume, WRITE, begin, b.length); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -1023,7 +1028,7 @@ public class FileIoProvider { super.write(b, off, len); eventHooks.afterFileIo(volume, WRITE, begin, len); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java index 5835fe81635..affd0934993 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java @@ -30,7 +30,7 @@ import javax.annotation.Nullable; * related operations on datanode volumes. */ @InterfaceAudience.Private -class ProfilingFileIoEvents implements FileIoEvents { +class ProfilingFileIoEvents extends FileIoEvents { @Override public long beforeMetadataOp(@Nullable FsVolumeSpi volume, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 47ca1f12b63..9817f97d96c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -64,7 +64,7 @@ abstract public class ReplicaInfo extends Block /** This is used by some tests and FsDatasetUtil#computeChecksum. */ private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER = - new FileIoProvider(null); + new FileIoProvider(null, null); /** * Constructor diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java index 1d534a369d4..997c0cba19a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; +import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -43,10 +44,10 @@ public interface AsyncChecker { * @param context the interpretation of the context depends on the * target. * - * @return returns a {@link ListenableFuture} that can be used to + * @return returns a {@link Optional of ListenableFuture} that can be used to * retrieve the result of the asynchronous check. */ - ListenableFuture schedule(Checkable target, K context); + Optional> schedule(Checkable target, K context); /** * Cancel all executing checks and wait for them to complete. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java index 8f346dcbef1..5ef3eec1868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; @@ -191,18 +192,26 @@ public class DatasetVolumeChecker { for (int i = 0; i < references.size(); ++i) { final FsVolumeReference reference = references.getReference(i); - allVolumes.add(reference.getVolume()); - ListenableFuture future = + Optional> olf = delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); LOG.info("Scheduled health check for volume {}", reference.getVolume()); - Futures.addCallback(future, new ResultHandler( - reference, healthyVolumes, failedVolumes, numVolumes, new Callback() { - @Override - public void call(Set ignored1, - Set ignored2) { + if (olf.isPresent()) { + allVolumes.add(reference.getVolume()); + Futures.addCallback(olf.get(), + new ResultHandler(reference, healthyVolumes, failedVolumes, + numVolumes, new Callback() { + @Override + public void call(Set ignored1, + Set ignored2) { + latch.countDown(); + } + })); + } else { + IOUtils.cleanup(null, reference); + if (numVolumes.decrementAndGet() == 0) { latch.countDown(); } - })); + } } // Wait until our timeout elapses, after which we give up on @@ -263,18 +272,26 @@ public class DatasetVolumeChecker { final Set healthyVolumes = new HashSet<>(); final Set failedVolumes = new HashSet<>(); final AtomicLong numVolumes = new AtomicLong(references.size()); + boolean added = false; LOG.info("Checking {} volumes", references.size()); for (int i = 0; i < references.size(); ++i) { final FsVolumeReference reference = references.getReference(i); // The context parameter is currently ignored. - ListenableFuture future = + Optional> olf = delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); - Futures.addCallback(future, new ResultHandler( - reference, healthyVolumes, failedVolumes, numVolumes, callback)); + if (olf.isPresent()) { + added = true; + Futures.addCallback(olf.get(), + new ResultHandler(reference, healthyVolumes, failedVolumes, + numVolumes, callback)); + } else { + IOUtils.cleanup(null, reference); + numVolumes.decrementAndGet(); + } } numAsyncDatasetChecks.incrementAndGet(); - return true; + return added; } /** @@ -291,7 +308,7 @@ public class DatasetVolumeChecker { } /** - * Check a single volume, returning a {@link ListenableFuture} + * Check a single volume asynchronously, returning a {@link ListenableFuture} * that can be used to retrieve the final result. * * If the volume cannot be referenced then it is already closed and @@ -305,21 +322,32 @@ public class DatasetVolumeChecker { public boolean checkVolume( final FsVolumeSpi volume, Callback callback) { + if (volume == null) { + LOG.debug("Cannot schedule check on null volume"); + return false; + } + FsVolumeReference volumeReference; try { volumeReference = volume.obtainReference(); } catch (ClosedChannelException e) { // The volume has already been closed. - callback.call(new HashSet(), new HashSet()); return false; } - ListenableFuture future = + + Optional> olf = delegateChecker.schedule(volume, IGNORED_CONTEXT); - numVolumeChecks.incrementAndGet(); - Futures.addCallback(future, new ResultHandler( - volumeReference, new HashSet(), new HashSet(), - new AtomicLong(1), callback)); - return true; + if (olf.isPresent()) { + numVolumeChecks.incrementAndGet(); + Futures.addCallback(olf.get(), + new ResultHandler(volumeReference, new HashSet(), + new HashSet(), + new AtomicLong(1), callback)); + return true; + } else { + IOUtils.cleanup(null, volumeReference); + } + return false; } /** @@ -343,8 +371,8 @@ public class DatasetVolumeChecker { * successful, add the volume here. * @param failedVolumes set of failed volumes. If the disk check fails, * add the volume here. - * @param semaphore semaphore used to trigger callback invocation. - * @param callback invoked when the semaphore can be successfully acquired. + * @param volumeCounter volumeCounter used to trigger callback invocation. + * @param callback invoked when the volumeCounter reaches 0. */ ResultHandler(FsVolumeReference reference, Set healthyVolumes, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java index 7337ad07f33..6e323e0be88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -158,8 +159,11 @@ public class StorageLocationChecker { // Start parallel disk check operations on all StorageLocations. for (StorageLocation location : dataDirs) { goodLocations.put(location, true); - futures.put(location, - delegateChecker.schedule(location, context)); + Optional> olf = + delegateChecker.schedule(location, context); + if (olf.isPresent()) { + futures.put(location, olf.get()); + } } if (maxVolumeFailuresTolerated >= dataDirs.size()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java index 89db88e07e5..ad56c617079 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; +import com.google.common.base.Optional; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -101,13 +102,11 @@ public class ThrottledAsyncChecker implements AsyncChecker { * will receive the same Future. */ @Override - public synchronized ListenableFuture schedule( - final Checkable target, - final K context) { - LOG.debug("Scheduling a check of {}", target); - + public Optional> schedule( + final Checkable target, final K context) { + LOG.info("Scheduling a check for {}", target); if (checksInProgress.containsKey(target)) { - return checksInProgress.get(target); + return Optional.absent(); } if (completedChecks.containsKey(target)) { @@ -115,11 +114,9 @@ public class ThrottledAsyncChecker implements AsyncChecker { final long msSinceLastCheck = timer.monotonicNow() - result.completedAt; if (msSinceLastCheck < minMsBetweenChecks) { LOG.debug("Skipped checking {}. Time since last check {}ms " + - "is less than the min gap {}ms.", + "is less than the min gap {}ms.", target, msSinceLastCheck, minMsBetweenChecks); - return result.result != null ? - Futures.immediateFuture(result.result) : - Futures.immediateFailedFuture(result.exception); + return Optional.absent(); } } @@ -132,7 +129,7 @@ public class ThrottledAsyncChecker implements AsyncChecker { }); checksInProgress.put(target, lf); addResultCachingCallback(target, lf); - return lf; + return Optional.of(lf); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index cd445576516..31cf39f700a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -277,7 +277,10 @@ class BlockPoolSlice { fileIoProvider.getFileOutputStream(volume, outFile), "UTF-8")) { // mtime is written last, so that truncated writes won't be valid. out.write(Long.toString(used) + " " + Long.toString(timer.now())); - fileIoProvider.flush(volume, out); + // This is only called as part of the volume shutdown. + // We explicitly avoid calling flush with fileIoProvider which triggers + // volume check upon io exception to avoid cyclic volume checks. + out.flush(); } } catch (IOException ioe) { // If write failed, the volume might be bad. Since the cache file is diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index e37934e3ff4..cc1d0c1f3bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1974,17 +1974,22 @@ class FsDatasetImpl implements FsDatasetSpi { */ File validateBlockFile(String bpid, long blockId) { //Should we check for metadata file too? - final File f; + File f = null; + ReplicaInfo info; try(AutoCloseableLock lock = datasetLock.acquire()) { - f = getFile(bpid, blockId, false); + info = volumeMap.get(bpid, blockId); + if (info != null) { + f = info.getBlockFile(); + } } if(f != null ) { - if(f.exists()) + if(f.exists()) { return f; + } // if file is not null, but doesn't exist - possibly disk failed - datanode.checkDiskErrorAsync(); + datanode.checkDiskErrorAsync(info.getVolume()); } if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index b1ce931d3fb..71d93ae2bb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -79,7 +79,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The underlying volume used to store replica. - * + * * It uses the {@link FsDatasetImpl} object for synchronization. */ @InterfaceAudience.Private @@ -98,7 +98,7 @@ public class FsVolumeImpl implements FsVolumeSpi { private final Map bpSlices = new ConcurrentHashMap(); private final File currentDir; // /current - private final DF usage; + private final DF usage; private final long reserved; private CloseableReferenceCount reference = new CloseableReferenceCount(); @@ -120,7 +120,7 @@ public class FsVolumeImpl implements FsVolumeSpi { * contention. */ protected ThreadPoolExecutor cacheExecutor; - + FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf, StorageType storageType) throws IOException { this.dataset = dataset; @@ -137,7 +137,8 @@ public class FsVolumeImpl implements FsVolumeSpi { this.configuredCapacity = -1; // dataset.datanode may be null in some tests. this.fileIoProvider = dataset.datanode != null ? - dataset.datanode.getFileIoProvider() : new FileIoProvider(conf); + dataset.datanode.getFileIoProvider() : + new FileIoProvider(conf, dataset.datanode); cacheExecutor = initializeCacheExecutor(parent); this.metrics = DataNodeVolumeMetrics.create(conf, parent.getAbsolutePath()); } @@ -288,7 +289,7 @@ public class FsVolumeImpl implements FsVolumeSpi { File getCurrentDir() { return currentDir; } - + File getRbwDir(String bpid) throws IOException { return getBlockPoolSlice(bpid).getRbwDir(); } @@ -358,11 +359,11 @@ public class FsVolumeImpl implements FsVolumeSpi { long getBlockPoolUsed(String bpid) throws IOException { return getBlockPoolSlice(bpid).getDfsUsed(); } - + /** * Return either the configured capacity of the file system if configured; or * the capacity of the file system excluding space reserved for non-HDFS. - * + * * @return the unreserved number of bytes left in this filesystem. May be * zero. */ @@ -389,7 +390,7 @@ public class FsVolumeImpl implements FsVolumeSpi { /* * Calculate the available space of the filesystem, excluding space reserved * for non-HDFS and space reserved for RBW - * + * * @return the available number of bytes left in this filesystem. May be zero. */ @Override @@ -460,7 +461,7 @@ public class FsVolumeImpl implements FsVolumeSpi { public String getBasePath() { return currentDir.getParent(); } - + @Override public boolean isTransientStorage() { return storageType.isTransient(); @@ -481,9 +482,9 @@ public class FsVolumeImpl implements FsVolumeSpi { */ @Override public String[] getBlockPoolList() { - return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]); + return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]); } - + /** * Temporary files. They get moved to the finalized block directory when * the block is finalized. @@ -692,7 +693,7 @@ public class FsVolumeImpl implements FsVolumeSpi { LOG.trace("getSubdirEntries({}, {}): no entries found in {}", storageID, bpid, dir.getAbsolutePath()); } else { - LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}", + LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}", storageID, bpid, entries.size(), dir.getAbsolutePath()); } cache = entries; @@ -910,7 +911,7 @@ public class FsVolumeImpl implements FsVolumeSpi { } return VolumeCheckResult.HEALTHY; } - + void getVolumeMap(ReplicaMap volumeMap, final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { @@ -918,7 +919,7 @@ public class FsVolumeImpl implements FsVolumeSpi { s.getVolumeMap(volumeMap, ramDiskReplicaMap); } } - + void getVolumeMap(String bpid, ReplicaMap volumeMap, final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { @@ -966,7 +967,7 @@ public class FsVolumeImpl implements FsVolumeSpi { } bpSlices.put(bpid, bp); } - + void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { @@ -992,7 +993,7 @@ public class FsVolumeImpl implements FsVolumeSpi { } return true; } - + void deleteBPDirectories(String bpid, boolean force) throws IOException { File volumeCurrentDir = this.getCurrentDir(); File bpDir = new File(volumeCurrentDir, bpid); @@ -1000,7 +1001,7 @@ public class FsVolumeImpl implements FsVolumeSpi { // nothing to be deleted return; } - File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); + File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); File finalizedDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_FINALIZED); @@ -1049,12 +1050,12 @@ public class FsVolumeImpl implements FsVolumeSpi { public String getStorageID() { return storageID; } - + @Override public StorageType getStorageType() { return storageType; } - + DatanodeStorage toDatanodeStorage() { return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index db5042b72a1..dd2ca99f62b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -589,7 +589,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { } registerMBean(datanodeUuid); - this.fileIoProvider = new FileIoProvider(conf); + this.fileIoProvider = new FileIoProvider(conf, datanode); this.storage = new SimulatedStorage( conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 25f4d5af416..600769beaa7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -942,7 +942,7 @@ public class TestDataNodeHotSwapVolumes { DataNodeTestUtils.injectDataDirFailure(dirToFail); // Call and wait DataNode to detect disk failure. long lastDiskErrorCheck = dn.getLastDiskErrorCheck(); - dn.checkDiskErrorAsync(); + dn.checkDiskErrorAsync(failedVolume); while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) { Thread.sleep(100); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java index 23ec8a72804..79a52bb1507 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertArrayEquals; @@ -76,7 +76,7 @@ public class TestDataNodeVolumeFailureReporting { final int WAIT_FOR_HEARTBEATS = 3000; // Wait at least (2 * re-check + 10 * heartbeat) seconds for - // a datanode to be considered dead by the namenode. + // a datanode to be considered dead by the namenode. final int WAIT_FOR_DEATH = 15000; @Before @@ -158,7 +158,7 @@ public class TestDataNodeVolumeFailureReporting { assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH; // Eventually the NN should report two volume failures - DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, + DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS); checkAggregateFailuresAtNameNode(true, 2); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); @@ -185,7 +185,7 @@ public class TestDataNodeVolumeFailureReporting { * did not grow or shrink the data volume while the test was running). */ dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0); - DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3, + DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3, origCapacity - (3*dnCapacity), WAIT_FOR_HEARTBEATS); checkAggregateFailuresAtNameNode(true, 3); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); @@ -211,7 +211,7 @@ public class TestDataNodeVolumeFailureReporting { dn3Vol2.getAbsolutePath()); // The NN considers the DN dead - DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2, + DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2, origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS); checkAggregateFailuresAtNameNode(true, 2); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); @@ -236,7 +236,7 @@ public class TestDataNodeVolumeFailureReporting { * and that the volume failure count should be reported as zero by * both the metrics and the NN. */ - DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity, + DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity, WAIT_FOR_HEARTBEATS); checkAggregateFailuresAtNameNode(true, 0); dns = cluster.getDataNodes(); @@ -259,8 +259,8 @@ public class TestDataNodeVolumeFailureReporting { long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm); long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0); - // Fail the first volume on both datanodes (we have to keep the - // third healthy so one node in the pipeline will not fail). + // Fail the first volume on both datanodes (we have to keep the + // third healthy so one node in the pipeline will not fail). File dn1Vol1 = new File(dataDir, "data"+(2*0+1)); File dn2Vol1 = new File(dataDir, "data"+(2*1+1)); DataNodeTestUtils.injectDataDirFailure(dn1Vol1, dn2Vol1); @@ -271,7 +271,7 @@ public class TestDataNodeVolumeFailureReporting { ArrayList dns = cluster.getDataNodes(); // The NN reports two volumes failures - DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, + DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS); checkAggregateFailuresAtNameNode(true, 2); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); @@ -318,6 +318,12 @@ public class TestDataNodeVolumeFailureReporting { DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L); DFSTestUtil.waitReplication(fs, file1, (short)3); + // Create additional file to trigger failure based volume check on dn1Vol2 + // and dn2Vol2. + Path file2 = new Path("/test2"); + DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L); + DFSTestUtil.waitReplication(fs, file2, (short)3); + ArrayList dns = cluster.getDataNodes(); assertTrue("DN1 should be up", dns.get(0).isDatanodeUp()); assertTrue("DN2 should be up", dns.get(1).isDatanodeUp()); @@ -536,8 +542,6 @@ public class TestDataNodeVolumeFailureReporting { private void checkFailuresAtDataNode(DataNode dn, long expectedVolumeFailuresCounter, boolean expectCapacityKnown, String... expectedFailedVolumes) throws Exception { - assertCounter("VolumeFailures", expectedVolumeFailuresCounter, - getMetrics(dn.getMetrics().name())); FsDatasetSpi fsd = dn.getFSDataset(); assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes()); assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java index 940e73bad8f..d6b4af9dbee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; +import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -103,24 +104,28 @@ public class TestDatasetVolumeChecker { /** * Request a check and ensure it triggered {@link FsVolumeSpi#check}. */ - checker.checkVolume(volume, new DatasetVolumeChecker.Callback() { - @Override - public void call(Set healthyVolumes, - Set failedVolumes) { - numCallbackInvocations.incrementAndGet(); - if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) { - assertThat(healthyVolumes.size(), is(1)); - assertThat(failedVolumes.size(), is(0)); - } else { - assertThat(healthyVolumes.size(), is(0)); - assertThat(failedVolumes.size(), is(1)); - } - } - }); + boolean result = + checker.checkVolume(volume, new DatasetVolumeChecker.Callback() { + @Override + public void call(Set healthyVolumes, + Set failedVolumes) { + numCallbackInvocations.incrementAndGet(); + if (expectedVolumeHealth != null && + expectedVolumeHealth != FAILED) { + assertThat(healthyVolumes.size(), is(1)); + assertThat(failedVolumes.size(), is(0)); + } else { + assertThat(healthyVolumes.size(), is(0)); + assertThat(failedVolumes.size(), is(1)); + } + } + }); // Ensure that the check was invoked at least once. verify(volume, times(1)).check(any(VolumeCheckContext.class)); - assertThat(numCallbackInvocations.get(), is(1L)); + if (result) { + assertThat(numCallbackInvocations.get(), is(1L)); + } } /** @@ -172,7 +177,7 @@ public class TestDatasetVolumeChecker { checker.setDelegateChecker(new DummyChecker()); final AtomicLong numCallbackInvocations = new AtomicLong(0); - checker.checkAllVolumesAsync( + boolean result = checker.checkAllVolumesAsync( dataset, new DatasetVolumeChecker.Callback() { @Override public void call( @@ -192,7 +197,9 @@ public class TestDatasetVolumeChecker { }); // The callback should be invoked exactly once. - assertThat(numCallbackInvocations.get(), is(1L)); + if (result) { + assertThat(numCallbackInvocations.get(), is(1L)); + } // Ensure each volume's check() method was called exactly once. for (FsVolumeSpi volume : volumes) { @@ -206,15 +213,18 @@ public class TestDatasetVolumeChecker { */ static class DummyChecker implements AsyncChecker { + @Override - public ListenableFuture schedule( + public Optional> schedule( Checkable target, VolumeCheckContext context) { try { - return Futures.immediateFuture(target.check(context)); + return Optional.of( + Futures.immediateFuture(target.check(context))); } catch (Exception e) { LOG.info("check routine threw exception " + e); - return Futures.immediateFailedFuture(e); + return Optional.of( + Futures.immediateFailedFuture(e)); } } @@ -259,4 +269,4 @@ public class TestDatasetVolumeChecker { } return volumes; } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java index 6f3d748e6e3..04d8bccb87b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java @@ -131,29 +131,6 @@ public class TestDatasetVolumeCheckerFailures { assertThat(checker.getNumSkippedChecks(), is(1L)); } - @Test(timeout=60000) - public void testMinGapIsEnforcedForASyncChecks() throws Exception { - final List volumes = - TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY); - final FsDatasetSpi dataset = - TestDatasetVolumeChecker.makeDataset(volumes); - final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer); - - checker.checkAllVolumesAsync(dataset, null); - assertThat(checker.getNumAsyncDatasetChecks(), is(1L)); - - // Re-check without advancing the timer. Ensure the check is skipped. - checker.checkAllVolumesAsync(dataset, null); - assertThat(checker.getNumAsyncDatasetChecks(), is(1L)); - assertThat(checker.getNumSkippedChecks(), is(1L)); - - // Re-check after advancing the timer. Ensure the check is performed. - timer.advance(MIN_DISK_CHECK_GAP_MS); - checker.checkAllVolumesAsync(dataset, null); - assertThat(checker.getNumAsyncDatasetChecks(), is(2L)); - assertThat(checker.getNumSkippedChecks(), is(1L)); - } - /** * Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine * hangs forever. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java index 30bdc0849de..11097724d4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java @@ -18,15 +18,14 @@ package org.apache.hadoop.hdfs.server.datanode.checker; +import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.FakeTimer; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +37,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.core.Is.isA; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -53,9 +49,6 @@ public class TestThrottledAsyncChecker { LoggerFactory.getLogger(TestThrottledAsyncChecker.class); private static final long MIN_ERROR_CHECK_GAP = 1000; - @Rule - public ExpectedException thrown = ExpectedException.none(); - /** * Test various scheduling combinations to ensure scheduling and * throttling behave as expected. @@ -70,34 +63,34 @@ public class TestThrottledAsyncChecker { getExecutorService()); // check target1 and ensure we get back the expected result. - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); // Check target1 again without advancing the timer. target1 should not - // be checked again and the cached result should be returned. - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); + // be checked again. + assertFalse(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); // Schedule target2 scheduled without advancing the timer. // target2 should be checked as it has never been checked before. - assertTrue(checker.schedule(target2, true).get()); - assertThat(target2.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target2, true).isPresent()); + waitTestCheckableCheckCount(target2, 1L); // Advance the timer but just short of the min gap. // Neither target1 nor target2 should be checked again. timer.advance(MIN_ERROR_CHECK_GAP - 1); - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); - assertTrue(checker.schedule(target2, true).get()); - assertThat(target2.numChecks.get(), is(1L)); + assertFalse(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); + assertFalse(checker.schedule(target2, true).isPresent()); + waitTestCheckableCheckCount(target2, 1L); // Advance the timer again. // Both targets should be checked now. timer.advance(MIN_ERROR_CHECK_GAP); - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(2L)); - assertTrue(checker.schedule(target2, true).get()); - assertThat(target1.numChecks.get(), is(2L)); + assertTrue(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 2L); + assertTrue(checker.schedule(target2, true).isPresent()); + waitTestCheckableCheckCount(target2, 2L); } @Test (timeout=60000) @@ -109,13 +102,16 @@ public class TestThrottledAsyncChecker { new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, getExecutorService()); - ListenableFuture lf = checker.schedule(target, true); - Futures.addCallback(lf, callback); + Optional> olf = + checker.schedule(target, true); + if (olf.isPresent()) { + Futures.addCallback(olf.get(), callback); + } // Request immediate cancellation. checker.shutdownAndWait(0, TimeUnit.MILLISECONDS); try { - assertFalse(lf.get()); + assertFalse(olf.get().get()); fail("Failed to get expected InterruptedException"); } catch (ExecutionException ee) { assertTrue(ee.getCause() instanceof InterruptedException); @@ -130,27 +126,33 @@ public class TestThrottledAsyncChecker { final ThrottledAsyncChecker checker = new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, getExecutorService()); - final ListenableFuture lf1 = checker.schedule(target, true); - final ListenableFuture lf2 = checker.schedule(target, true); + final Optional> olf1 = + checker.schedule(target, true); - // Ensure that concurrent requests return the same future object. - assertTrue(lf1 == lf2); + final Optional> olf2 = + checker.schedule(target, true); + + // Ensure that concurrent requests return the future object + // for the first caller. + assertTrue(olf1.isPresent()); + assertFalse(olf2.isPresent()); // Unblock the latch and wait for it to finish execution. target.latch.countDown(); - lf1.get(); + olf1.get().get(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - // We should not get back the same future as before. + // We should get an absent Optional. // This can take a short while until the internal callback in // ThrottledAsyncChecker is scheduled for execution. // Also this should not trigger a new check operation as the timer // was not advanced. If it does trigger a new check then the test // will fail with a timeout. - final ListenableFuture lf3 = checker.schedule(target, true); - return lf3 != lf2; + final Optional> olf3 = + checker.schedule(target, true); + return !olf3.isPresent(); } }, 100, 10000); } @@ -168,15 +170,30 @@ public class TestThrottledAsyncChecker { new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, getExecutorService()); - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); timer.advance(MIN_ERROR_CHECK_GAP + 1); - assertFalse(checker.schedule(target1, false).get()); - assertThat(target1.numChecks.get(), is(2L)); + assertTrue(checker.schedule(target1, false).isPresent()); + waitTestCheckableCheckCount(target1, 2L); + } + private void waitTestCheckableCheckCount( + final TestCheckableBase target, + final long expectedChecks) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + // This can take a short while until the internal callback in + // ThrottledAsyncChecker is scheduled for execution. + // If it does trigger a new check then the test + // will fail with a timeout. + return target.getTotalChecks() == expectedChecks; + } + }, 100, 10000); + } /** - * Ensure that the exeption from a failed check is cached + * Ensure that the exception from a failed check is cached * and returned without re-running the check when the minimum * gap has not elapsed. * @@ -190,13 +207,11 @@ public class TestThrottledAsyncChecker { new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, getExecutorService()); - thrown.expectCause(isA(DummyException.class)); - checker.schedule(target1, true).get(); - assertThat(target1.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); - thrown.expectCause(isA(DummyException.class)); - checker.schedule(target1, true).get(); - assertThat(target1.numChecks.get(), is(2L)); + assertFalse(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); } /** @@ -206,28 +221,38 @@ public class TestThrottledAsyncChecker { return new ScheduledThreadPoolExecutor(1); } + private abstract static class TestCheckableBase + implements Checkable { + protected final AtomicLong numChecks = new AtomicLong(0); + + public long getTotalChecks() { + return numChecks.get(); + } + + public void incrTotalChecks() { + numChecks.incrementAndGet(); + } + } + /** * A Checkable that just returns its input. */ private static class NoOpCheckable - implements Checkable { - private final AtomicLong numChecks = new AtomicLong(0); + extends TestCheckableBase { @Override public Boolean check(Boolean context) { - numChecks.incrementAndGet(); + incrTotalChecks(); return context; } } private static class ThrowingCheckable - implements Checkable { - private final AtomicLong numChecks = new AtomicLong(0); + extends TestCheckableBase { @Override public Boolean check(Boolean context) throws DummyException { - numChecks.incrementAndGet(); + incrTotalChecks(); throw new DummyException(); } - } private static class DummyException extends Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 10b2f561a64..77b5258e67b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -159,7 +159,7 @@ public class TestFsDatasetImpl { this.conf = new Configuration(); this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); - final FileIoProvider fileIoProvider = new FileIoProvider(conf); + final FileIoProvider fileIoProvider = new FileIoProvider(conf, null); when(datanode.getFileIoProvider()).thenReturn(fileIoProvider); when(datanode.getConf()).thenReturn(conf); final DNConf dnConf = new DNConf(datanode);