From b92477c638c9b9235868bfd13518e36545139c90 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Thu, 27 Feb 2020 09:45:12 -0600 Subject: [PATCH] HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed by Ahmed Hussein. (cherry picked from commit 27cfda708ef66dfbe5f52a5f1e716298a294f3f7) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java --- .../org/apache/hadoop/util/ThreadUtil.java | 28 ++ .../apache/hadoop/test/GenericTestUtils.java | 15 +- .../server/blockmanagement/BlockManager.java | 21 +- .../hdfs/server/namenode/FSNamesystem.java | 31 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 14 +- .../fsdataset/impl/LazyPersistTestCase.java | 364 ++++++++++++++---- .../fsdataset/impl/TestLazyPersistFiles.java | 69 ++-- .../impl/TestLazyPersistReplicaPlacement.java | 2 +- .../fsdataset/impl/TestLazyWriter.java | 6 +- 9 files changed, 406 insertions(+), 144 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java index 2cda8a443e4..f9ea3fcacd3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java @@ -49,6 +49,34 @@ public class ThreadUtil { } } + /** + * Join a thread as uninterruptible. + * The call continues to block until the result is available even when the + * caller thread is interrupted. + * The method will log any {@link InterruptedException} then will re-interrupt + * the thread. + * + * @param toJoin the thread to Join on. + */ + public static void joinUninterruptibly(Thread toJoin) { + boolean interrupted = false; + try { + while (true) { + try { + toJoin.join(); + return; + } catch (InterruptedException e) { + interrupted = true; + LOG.warn("interrupted while sleeping", e); + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + /** * Convenience method that returns a resource as inputstream from the * classpath. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index 547990735e5..ba5644ffc24 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -60,7 +60,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Sets; @@ -369,11 +368,15 @@ public abstract class GenericTestUtils { * time * @throws InterruptedException if the method is interrupted while waiting */ - public static void waitFor(Supplier check, int checkEveryMillis, - int waitForMillis) throws TimeoutException, InterruptedException { - Preconditions.checkNotNull(check, ERROR_MISSING_ARGUMENT); - Preconditions.checkArgument(waitForMillis >= checkEveryMillis, - ERROR_INVALID_ARGUMENT); + public static void waitFor(final Supplier check, + final long checkEveryMillis, final long waitForMillis) + throws TimeoutException, InterruptedException { + if (check == null) { + throw new NullPointerException(ERROR_MISSING_ARGUMENT); + } + if (waitForMillis < checkEveryMillis) { + throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT); + } long st = Time.monotonicNow(); boolean result = check.get(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 34c6fd1bb17..17d5603634e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -48,6 +48,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; + +import java.util.concurrent.atomic.AtomicLong; import javax.management.ObjectName; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -321,7 +323,12 @@ public class BlockManager implements BlockStatsMXBean { /** Redundancy thread. */ private final Daemon redundancyThread = new Daemon(new RedundancyMonitor()); - + /** + * Timestamp marking the end time of {@link #redundancyThread}'s full cycle. + * This value can be checked by the Junit tests to verify that the + * {@link #redundancyThread} has run at least one full iteration. + */ + private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1); /** StorageInfoDefragmenter thread. */ private final Daemon storageInfoDefragmenterThread = new Daemon(new StorageInfoDefragmenter()); @@ -4780,6 +4787,17 @@ public class BlockManager implements BlockStatsMXBean { return neededReconstruction.size(); } + /** + * Used as ad hoc to check the time stamp of the last full cycle of + * {@link #redundancyThread}. This is used by the Junit tests to block until + * {@link #lastRedundancyCycleTS} is updated. + * @return the current {@link #lastRedundancyCycleTS}. + */ + @VisibleForTesting + public long getLastRedundancyMonitorTS() { + return lastRedundancyCycleTS.get(); + } + /** * Periodically calls computeBlockRecoveryWork(). */ @@ -4794,6 +4812,7 @@ public class BlockManager implements BlockStatsMXBean { computeDatanodeWork(); processPendingReconstructions(); rescanPostponedMisreplicatedBlocks(); + lastRedundancyCycleTS.set(Time.monotonicNow()); } TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); } catch (Throwable t) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 474da2f9fff..59a380c5c6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -89,6 +89,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT; + +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE; @@ -113,7 +115,6 @@ import java.io.DataInput; import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -308,6 +309,7 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; import org.apache.log4j.Appender; import org.apache.log4j.AsyncAppender; @@ -486,7 +488,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // A daemon to periodically clean up corrupt lazyPersist files // from the name space. Daemon lazyPersistFileScrubber = null; - + /** + * Timestamp marking the end time of {@link #lazyPersistFileScrubber}'s full + * cycle. This value can be checked by the Junit tests to verify that the + * {@link #lazyPersistFileScrubber} has run at least one full iteration. + */ + private final AtomicLong lazyPersistFileScrubberTS = new AtomicLong(0); // Executor to warm up EDEK cache private ExecutorService edekCacheLoader = null; private final int edekCacheLoaderDelay; @@ -645,6 +652,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return leaseManager; } + /** + * Used as ad hoc to check the time stamp of the last full cycle of {@link + * #lazyPersistFileScrubber} daemon. This is used by the Junit tests to block + * until {@link #lazyPersistFileScrubberTS} is updated. + * + * @return the current {@link #lazyPersistFileScrubberTS} if {@link + * #lazyPersistFileScrubber} is not null. + */ + @VisibleForTesting + public long getLazyPersistFileScrubberTS() { + return lazyPersistFileScrubber == null ? -1 + : lazyPersistFileScrubberTS.get(); + } + public boolean isHaEnabled() { return haEnabled; } @@ -4116,10 +4137,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, try { if (!isInSafeMode()) { clearCorruptLazyPersistFiles(); + // set the timeStamp of last Cycle. + lazyPersistFileScrubberTS.set(Time.monotonicNow()); } else { if (FSNamesystem.LOG.isDebugEnabled()) { - FSNamesystem.LOG - .debug("Namenode is in safemode, skipping scrubbing of corrupted lazy-persist files."); + FSNamesystem.LOG.debug("Namenode is in safemode, skipping " + + "scrubbing of corrupted lazy-persist files."); } } } catch (Exception e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 3d0d8828467..2e26e5f43f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -2000,6 +2000,15 @@ public class DFSTestUtil { GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); } + public static void setNameNodeLogLevel(org.slf4j.event.Level level) { + GenericTestUtils.setLogLevel(FSNamesystem.LOG, level); + GenericTestUtils.setLogLevel(BlockManager.LOG, level); + GenericTestUtils.setLogLevel(LeaseManager.LOG, level); + GenericTestUtils.setLogLevel(NameNode.LOG, level); + GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level); + GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); + } + /** * Get the NamenodeProtocol RPC proxy for the NN associated with this * DFSClient object @@ -2282,15 +2291,12 @@ public class DFSTestUtil { public Boolean get() { try { final int currentValue = Integer.parseInt(jmx.getValue(metricName)); - LOG.info("Waiting for " + metricName + - " to reach value " + expectedValue + - ", current value = " + currentValue); return currentValue == expectedValue; } catch (Exception e) { throw new UnhandledException("Test failed due to unexpected exception", e); } } - }, 1000, 60000); + }, 50, 60000); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java index aae59ddc5e9..52fabf32592 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import com.google.common.base.Supplier; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import static org.apache.hadoop.fs.CreateFlag.CREATE; @@ -45,6 +49,14 @@ import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; @@ -68,10 +80,13 @@ import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; import org.junit.After; import org.junit.Rule; import org.junit.rules.Timeout; +import org.slf4j.event.Level; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public abstract class LazyPersistTestCase { static final byte LAZY_PERSIST_POLICY_ID = (byte) 15; @@ -81,16 +96,33 @@ public abstract class LazyPersistTestCase { GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG); } + protected static final Logger LOG = + LoggerFactory.getLogger(LazyPersistTestCase.class); protected static final int BLOCK_SIZE = 5 * 1024 * 1024; protected static final int BUFFER_LENGTH = 4096; - private static final long HEARTBEAT_INTERVAL_SEC = 1; - private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; - private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; - private static final String JMX_SERVICE_NAME = "DataNode"; protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; protected static final int LAZY_WRITER_INTERVAL_SEC = 1; - protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class); protected static final short REPL_FACTOR = 1; + private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; + private static final String JMX_SERVICE_NAME = "DataNode"; + private static final long HEARTBEAT_INTERVAL_SEC = 1; + private static final int HEARTBEAT_RECHECK_INTERVAL_MS = 500; + private static final long WAIT_FOR_FBR_MS = + TimeUnit.SECONDS.toMillis(10); + private static final long WAIT_FOR_STORAGE_TYPES_MS = + TimeUnit.SECONDS.toMillis(30); + private static final long WAIT_FOR_ASYNC_DELETE_MS = + TimeUnit.SECONDS.toMillis(10); + private static final long WAIT_FOR_DN_SHUTDOWN_MS = + TimeUnit.SECONDS.toMillis(30); + private static final long WAIT_FOR_REDUNDANCY_MS = + TimeUnit.SECONDS + .toMillis(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT); + private static final long WAIT_FOR_LAZY_SCRUBBER_MS = + TimeUnit.SECONDS.toMillis(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC); + private static final long WAIT_POLL_INTERVAL_MS = 10; + private static final long WAIT_POLL_INTERVAL_LARGE_MS = 20; + protected final long osPageSize = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize(); @@ -133,76 +165,79 @@ public abstract class LazyPersistTestCase { Path path, StorageType storageType) throws IOException, TimeoutException, InterruptedException { // Ensure that returned block locations returned are correct! - LOG.info("Ensure path: " + path + " is on StorageType: " + storageType); + LOG.info("Ensure path: {} is on StorageType: {}", path, storageType); assertThat(fs.exists(path), is(true)); long fileLength = client.getFileInfo(path.toString()).getLen(); - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - LocatedBlocks locatedBlocks = - client.getLocatedBlocks(path.toString(), 0, fileLength); - for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { - if (locatedBlock.getStorageTypes()[0] != storageType) { - return false; - } + GenericTestUtils.waitFor(() -> { + try { + LocatedBlocks locatedBlocks = + client.getLocatedBlocks(path.toString(), 0, fileLength); + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + if (locatedBlock.getStorageTypes()[0] != storageType) { + return false; } - return true; - } catch (IOException ioe) { - LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe); - return false; } + return true; + } catch (IOException ioe) { + LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe); + return false; } - }, 100, 30 * 1000); + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS); return client.getLocatedBlocks(path.toString(), 0, fileLength); } /** - * Make sure at least one non-transient volume has a saved copy of the replica. - * An infinite loop is used to ensure the async lazy persist tasks are completely - * done before verification. Caller of ensureLazyPersistBlocksAreSaved expects - * either a successful pass or timeout failure. + * Make sure at least one non-transient volume has a saved copy of the + * replica. An infinite loop is used to ensure the async lazy persist tasks + * are completely done before verification. + * Caller of this method expects either a successful pass or timeout failure. + * + * @param locatedBlocks the collection of blocks and their locations. + * @throws IOException for aut-closeable resources. + * @throws InterruptedException if the thread is interrupted. + * @throws TimeoutException if {@link #WAIT_FOR_STORAGE_TYPES_MS} expires + * before we find a persisted copy for each located + * block. */ protected final void ensureLazyPersistBlocksAreSaved( - LocatedBlocks locatedBlocks) throws IOException, InterruptedException { + final LocatedBlocks locatedBlocks) + throws IOException, InterruptedException, TimeoutException { final String bpid = cluster.getNamesystem().getBlockPoolId(); final Set persistedBlockIds = new HashSet(); - + // We should find a persisted copy for each located block. try (FsDatasetSpi.FsVolumeReferences volumes = cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) { - while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks() - .size()) { - // Take 1 second sleep before each verification iteration - Thread.sleep(1000); - + GenericTestUtils.waitFor(() -> { for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { for (FsVolumeSpi v : volumes) { if (v.isTransientStorage()) { continue; } - FsVolumeImpl volume = (FsVolumeImpl) v; - File lazyPersistDir = - volume.getBlockPoolSlice(bpid).getLazypersistDir(); - + File lazyPersistDir; + try { + lazyPersistDir = + volume.getBlockPoolSlice(bpid).getLazypersistDir(); + } catch (IOException ioe) { + return false; + } long blockId = lb.getBlock().getBlockId(); File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, blockId); File blockFile = new File(targetDir, lb.getBlock().getBlockName()); if (blockFile.exists()) { - // Found a persisted copy for this block and added to the Set + // Found a persisted copy for this block and added to the Set. persistedBlockIds.add(blockId); } } } - } + return (persistedBlockIds.size() == + locatedBlocks.getLocatedBlocks().size()); + }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_STORAGE_TYPES_MS); } - - // We should have found a persisted copy for each located block. - assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size())); } protected final void makeRandomTestFile(Path path, long length, @@ -271,7 +306,7 @@ public abstract class LazyPersistTestCase { } conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, - HEARTBEAT_RECHECK_INTERVAL_MSEC); + HEARTBEAT_RECHECK_INTERVAL_MS); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, LAZY_WRITER_INTERVAL_SEC); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1); @@ -334,18 +369,18 @@ public abstract class LazyPersistTestCase { @Override public void mlock(String identifier, ByteBuffer mmap, long length) throws IOException { - LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes."); + LOG.info("LazyPersistTestCase: faking mlock of {} bytes.", identifier); } @Override public long getMemlockLimit() { - LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE); + LOG.info("LazyPersistTestCase: fake return {}", Long.MAX_VALUE); return Long.MAX_VALUE; } @Override public boolean verifyCanMlock() { - LOG.info("LazyPersistTestCase: fake return " + true); + LOG.info("LazyPersistTestCase: fake return {}", true); return true; } }); @@ -413,8 +448,10 @@ public abstract class LazyPersistTestCase { public void build() throws IOException { LazyPersistTestCase.this.startUpCluster( - numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity, - ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal, + numDatanodes, hasTransientStorage, storageTypes, + ramDiskReplicaCapacity, + ramDiskStorageLimit, maxLockedMemory, useScr, + useLegacyBlockReaderLocal, disableScrubber); } @@ -429,11 +466,44 @@ public abstract class LazyPersistTestCase { private boolean disableScrubber=false; } + /** + * Forces a full blockreport on all the datatanodes. The call blocks waiting + * for all blockreports to be received by the namenode. + * + * @throws IOException if an exception is thrown while getting the datanode + * descriptors or triggering the blockreports. + * @throws InterruptedException if the thread receives an interrupt. + * @throws TimeoutException if the reports are not received by + * {@link #WAIT_FOR_FBR_MS}. + */ protected final void triggerBlockReport() - throws IOException, InterruptedException { + throws InterruptedException, TimeoutException, IOException { // Trigger block report to NN - DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); - Thread.sleep(10 * 1000); + final Map reportCountsBefore = + new HashMap<>(); + final FSNamesystem fsn = cluster.getNamesystem(); + for (DataNode dn : cluster.getDataNodes()) { + final DatanodeDescriptor dnd = + NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); + final DatanodeStorageInfo storage = dnd.getStorageInfos()[0]; + reportCountsBefore.put(storage, storage.getBlockReportCount()); + DataNodeTestUtils.triggerBlockReport(dn); + } + // wait for block reports to be received. + GenericTestUtils.waitFor(() -> { + for (Entry reportEntry : + reportCountsBefore.entrySet()) { + final DatanodeStorageInfo dnStorageInfo = reportEntry.getKey(); + final int cntBefore = reportEntry.getValue(); + final int currentCnt = dnStorageInfo.getBlockReportCount(); + if (cntBefore == currentCnt) { + // Same count means no report has been received. + return false; + } + } + // If we reach here, then all the block reports have been received. + return true; + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS); } protected final boolean verifyBlockDeletedFromDir(File dir, @@ -445,51 +515,58 @@ public abstract class LazyPersistTestCase { File blockFile = new File(targetDir, lb.getBlock().getBlockName()); if (blockFile.exists()) { - LOG.warn("blockFile: " + blockFile.getAbsolutePath() + - " exists after deletion."); return false; } File metaFile = new File(targetDir, DatanodeUtil.getMetaName(lb.getBlock().getBlockName(), lb.getBlock().getGenerationStamp())); if (metaFile.exists()) { - LOG.warn("metaFile: " + metaFile.getAbsolutePath() + - " exists after deletion."); return false; } } return true; } - protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks) - throws IOException, InterruptedException { + protected final boolean verifyDeletedBlocks(final LocatedBlocks locatedBlocks) + throws Exception { LOG.info("Verifying replica has no saved copy after deletion."); triggerBlockReport(); + final DataNode dn = cluster.getDataNodes().get(0); - while( - cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions() - > 0L){ - Thread.sleep(1000); - } - - final String bpid = cluster.getNamesystem().getBlockPoolId(); - final FsDatasetSpi dataset = - cluster.getDataNodes().get(0).getFSDataset(); - - // Make sure deleted replica does not have a copy on either finalized dir of - // transient volume or finalized dir of non-transient volume - try (FsDatasetSpi.FsVolumeReferences volumes = - dataset.getFsVolumeReferences()) { - for (FsVolumeSpi vol : volumes) { - FsVolumeImpl volume = (FsVolumeImpl) vol; - File targetDir = (volume.isTransientStorage()) ? - volume.getBlockPoolSlice(bpid).getFinalizedDir() : - volume.getBlockPoolSlice(bpid).getLazypersistDir(); - if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) { + GenericTestUtils.waitFor(() -> { + for (DataNode dn1 : cluster.getDataNodes()) { + if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions() + > 0) { return false; } } + return true; + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS); + + final String bpid = cluster.getNamesystem().getBlockPoolId(); + final FsDatasetSpi dataset = dn.getFSDataset(); + // Make sure deleted replica does not have a copy on either finalized dir of + // transient volume or finalized dir of non-transient volume. + // We need to wait until the asyn deletion is scheduled. + try (FsDatasetSpi.FsVolumeReferences volumes = + dataset.getFsVolumeReferences()) { + GenericTestUtils.waitFor(() -> { + try { + for (FsVolumeSpi vol : volumes) { + FsVolumeImpl volume = (FsVolumeImpl) vol; + File targetDir = (volume.isTransientStorage()) ? + volume.getBlockPoolSlice(bpid).getFinalizedDir() : + volume.getBlockPoolSlice(bpid).getLazypersistDir(); + if (!verifyBlockDeletedFromDir(targetDir, locatedBlocks)) { + return false; + } + } + return true; + } catch (IOException ie) { + return false; + } + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS); } return true; } @@ -530,8 +607,137 @@ public abstract class LazyPersistTestCase { DFSTestUtil.waitForMetric(jmx, metricName, expectedValue); } - protected void triggerEviction(DataNode dn) { + protected void triggerEviction(final DataNode dn) { FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset(); fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle. } + + /** + * Shutdown all datanodes in {@link #cluster}. The call blocks for + * {@link #WAIT_FOR_DN_SHUTDOWN_MS} until client report has no datanode + * labeled as live. + * + * @throws TimeoutException if {@link #WAIT_FOR_DN_SHUTDOWN_MS} expires with + * at least one datanode still alive. + * @throws InterruptedException if the thread receives an interrupt. + */ + protected void shutdownDataNodes() + throws TimeoutException, InterruptedException { + cluster.shutdownDataNodes(); + GenericTestUtils.waitFor(() -> { + try { + DatanodeInfo[] info = client.datanodeReport( + HdfsConstants.DatanodeReportType.LIVE); + return info.length == 0; + } catch (IOException e) { + return false; + } + }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS); + } + + /** + * Blocks for {@link #WAIT_FOR_REDUNDANCY_MS} waiting for corrupt block count + * to reach a certain count. + * + * @param corruptCnt representing the number of corrupt blocks before + * resuming. + * @throws TimeoutException if {@link #WAIT_FOR_REDUNDANCY_MS} expires with + * corrupt count does not meet the criteria. + * @throws InterruptedException if the thread receives an interrupt. + */ + protected void waitForCorruptBlock(final long corruptCnt) + throws TimeoutException, InterruptedException { + // wait for the redundancy monitor to mark the file as corrupt. + GenericTestUtils.waitFor(() -> { + Iterator bInfoIter = cluster.getNameNode() + .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator(); + int count = 0; + while (bInfoIter.hasNext()) { + bInfoIter.next(); + count++; + } + return corruptCnt == count; + }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS); + } + + /** + * Blocks until {@link FSNamesystem#lazyPersistFileScrubber} daemon completes + * a full iteration. + * + * @throws InterruptedException if the thread receives an interrupt. + * @throws TimeoutException + * {@link FSNamesystem#getLazyPersistFileScrubberTS()} + * does not update the timestamp by + * {@link #WAIT_FOR_LAZY_SCRUBBER_MS}. + */ + protected void waitForScrubberCycle() + throws TimeoutException, InterruptedException { + // wait for the redundancy monitor to mark the file as corrupt. + final FSNamesystem fsn = cluster.getNamesystem(); + final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS(); + if (lastTimeStamp == -1) { // scrubber is disabled + return; + } + GenericTestUtils.waitFor( + () -> lastTimeStamp != fsn.getLazyPersistFileScrubberTS(), + 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS); + } + + /** + * Blocks until {@link BlockManager#RedundancyMonitor} daemon completes + * a full iteration. + * + * @throws InterruptedException if the thread receives an interrupt. + * @throws TimeoutException {@link BlockManager#getLastRedundancyMonitorTS()} + * does not update the timestamp by + * {@link #WAIT_FOR_REDUNDANCY_MS}. + */ + protected void waitForRedundancyMonitorCycle() + throws TimeoutException, InterruptedException { + // wait for the redundancy monitor to mark the file as corrupt. + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + final long lastRedundancyTS = + bm.getLastRedundancyMonitorTS(); + + GenericTestUtils.waitFor( + () -> lastRedundancyTS != bm.getLastRedundancyMonitorTS(), + 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS); + } + + /** + * Blocks until {@link BlockManager#lowRedundancyBlocksCount} reaches a + * certain value. + * + * @throws InterruptedException if the thread receives an interrupt. + * @throws TimeoutException {@link BlockManager#getLowRedundancyBlocksCount()} + * does not update the count by + * {@link #WAIT_FOR_REDUNDANCY_MS}. + */ + protected void waitForLowRedundancyCount(final long cnt) + throws TimeoutException, InterruptedException { + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + + GenericTestUtils.waitFor(() -> cnt == bm.getLowRedundancyBlocksCount(), + 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS); + } + + /** + * Blocks until the file status changes on the filesystem. + * + * @param path of the file to be checked. + * @param expected whether a file should exist or not. + * @throws TimeoutException if the file status does not meet the expected by + * {@link #WAIT_FOR_STORAGE_TYPES_MS}. + * @throws InterruptedException if the thread receives an interrupt. + */ + protected void waitForFile(final Path path, final boolean expected) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + try { + return expected == fs.exists(path); + } catch (IOException e) { + return false; + } + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 04f81278834..c0b4b17ea95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -16,11 +16,10 @@ * limitations under the License. */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.Path; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.ThreadUtil; import org.junit.Assert; import org.junit.Test; @@ -33,7 +32,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.fs.StorageType.RAM_DISK; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -76,7 +74,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { makeTestFile(path, BLOCK_SIZE, true); try { - client.truncate(path.toString(), BLOCK_SIZE/2); + client.truncate(path.toString(), BLOCK_SIZE / 2); fail("Truncate to LazyPersist file did not fail as expected"); } catch (Throwable t) { LOG.info("Got expected exception ", t); @@ -98,28 +96,20 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); - // Stop the DataNode and sleep for the time it takes the NN to - // detect the DN as being dead. - cluster.shutdownDataNodes(); - Thread.sleep(30000L); + // Stop the DataNode. + shutdownDataNodes(); assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1)); // Next, wait for the redundancy monitor to mark the file as corrupt. - Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000); - + waitForRedundancyMonitorCycle(); // Wait for the LazyPersistFileScrubber to run - Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000); - + waitForScrubberCycle(); // Ensure that path1 does not exist anymore, whereas path2 does. - assert(!fs.exists(path1)); + waitForFile(path1, false); // We should have zero blocks that needs replication i.e. the one - // belonging to path2. - assertThat(cluster.getNameNode() - .getNamesystem() - .getBlockManager() - .getLowRedundancyBlocksCount(), - is(0L)); + // belonging to path2. This needs a wait. + waitForLowRedundancyCount(0L); } @Test @@ -134,18 +124,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { // Stop the DataNode and sleep for the time it takes the NN to // detect the DN as being dead. - cluster.shutdownDataNodes(); - Thread.sleep(30000L); - - // Next, wait for the redundancy monitor to mark the file as corrupt. - Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000); + shutdownDataNodes(); + // wait for the redundancy monitor to mark the file as corrupt. + waitForCorruptBlock(1L); // Wait for the LazyPersistFileScrubber to run - Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000); - + waitForScrubberCycle(); // Ensure that path1 exist. - Assert.assertTrue(fs.exists(path1)); - + waitForFile(path1, true); } /** @@ -160,20 +146,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); - cluster.shutdownDataNodes(); + shutdownDataNodes(); cluster.restartNameNodes(); // wait for the redundancy monitor to mark the file as corrupt. - Long corruptBlkCount; - do { - Thread.sleep(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000); - corruptBlkCount = (long) Iterators.size(cluster.getNameNode() - .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator()); - } while (corruptBlkCount != 1L); - + waitForCorruptBlock(1L); // Ensure path1 exist. - Assert.assertTrue(fs.exists(path1)); + waitForFile(path1, true); } /** @@ -215,10 +195,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { threads[i].start(); } - Thread.sleep(500); - for (int i = 0; i < NUM_TASKS; i++) { - Uninterruptibles.joinUninterruptibly(threads[i]); + ThreadUtil.joinUninterruptibly(threads[i]); } Assert.assertFalse(testFailed.get()); } @@ -232,7 +210,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { */ @Test public void testConcurrentWrites() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { getClusterBuilder().setRamDiskReplicaCapacity(9).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); final int SEED = 0xFADED; @@ -281,11 +259,11 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { this.seed = seed; this.latch = latch; this.bFail = bFail; - System.out.println("Creating Writer: " + id); + LOG.info("Creating Writer: {}", id); } public void run() { - System.out.println("Writer " + id + " starting... "); + LOG.info("Writer {} starting... ", id); int i = 0; try { for (i = 0; i < paths.length; i++) { @@ -295,9 +273,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { } } catch (IOException e) { bFail.set(true); - LOG.error("Writer exception: writer id:" + id + - " testfile: " + paths[i].toString() + - " " + e); + LOG.error("Writer exception: writer id:{} testfile: {}", + id, paths[i].toString(), e); } finally { latch.countDown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java index c16dbe5604d..b6413ec6246 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java @@ -119,7 +119,7 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase { */ @Test public void testFallbackToDiskPartial() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java index 16807640fd0..56cc41e37fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -156,7 +157,6 @@ public class TestLazyWriter extends LazyPersistTestCase { for (int i = 0; i < NUM_PATHS; ++i) { makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true); triggerBlockReport(); - Thread.sleep(3000); ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK); ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT); for (int j = i + 1; j < NUM_PATHS; ++j) { @@ -183,13 +183,13 @@ public class TestLazyWriter extends LazyPersistTestCase { throws Exception { getClusterBuilder().build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); - FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); + final DataNode dn = cluster.getDataNodes().get(0); + FsDatasetTestUtil.stopLazyWriter(dn); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); - // Delete before persist client.delete(path.toString(), false); Assert.assertFalse(fs.exists(path));