From de7edf58bd37a7729301d897c6eda952b9692698 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Thu, 27 Feb 2020 09:16:55 -0600 Subject: [PATCH] Revert "HADOOP-16888. [JDK11] Support JDK11 in the precommit job. Contributed by" Incorrect commit message This reverts commit 749d7c0027b8978f9c07af04031a8fad6d7c18e1. --- .../org/apache/hadoop/util/ThreadUtil.java | 28 -- .../apache/hadoop/test/GenericTestUtils.java | 15 +- .../server/blockmanagement/BlockManager.java | 20 +- .../hdfs/server/namenode/FSNamesystem.java | 30 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 14 +- .../fsdataset/impl/LazyPersistTestCase.java | 364 ++++-------------- .../fsdataset/impl/TestLazyPersistFiles.java | 69 ++-- .../impl/TestLazyPersistReplicaPlacement.java | 2 +- .../fsdataset/impl/TestLazyWriter.java | 6 +- 9 files changed, 146 insertions(+), 402 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java index f9ea3fcacd3..2cda8a443e4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java @@ -49,34 +49,6 @@ public class ThreadUtil { } } - /** - * Join a thread as uninterruptible. - * The call continues to block until the result is available even when the - * caller thread is interrupted. - * The method will log any {@link InterruptedException} then will re-interrupt - * the thread. - * - * @param toJoin the thread to Join on. - */ - public static void joinUninterruptibly(Thread toJoin) { - boolean interrupted = false; - try { - while (true) { - try { - toJoin.join(); - return; - } catch (InterruptedException e) { - interrupted = true; - LOG.warn("interrupted while sleeping", e); - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - /** * Convenience method that returns a resource as inputstream from the * classpath. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index 9e916348736..0082452e514 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -61,6 +61,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Sets; @@ -377,15 +378,11 @@ public abstract class GenericTestUtils { * time * @throws InterruptedException if the method is interrupted while waiting */ - public static void waitFor(final Supplier check, - final long checkEveryMillis, final long waitForMillis) - throws TimeoutException, InterruptedException { - if (check == null) { - throw new NullPointerException(ERROR_MISSING_ARGUMENT); - } - if (waitForMillis < checkEveryMillis) { - throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT); - } + public static void waitFor(Supplier check, int checkEveryMillis, + int waitForMillis) throws TimeoutException, InterruptedException { + Preconditions.checkNotNull(check, ERROR_MISSING_ARGUMENT); + Preconditions.checkArgument(waitForMillis >= checkEveryMillis, + ERROR_INVALID_ARGUMENT); long st = Time.monotonicNow(); boolean result = check.get(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3185f1de3b7..e7422df7c45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -49,7 +49,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import javax.management.ObjectName; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -325,12 +324,7 @@ public class BlockManager implements BlockStatsMXBean { /** Redundancy thread. */ private final Daemon redundancyThread = new Daemon(new RedundancyMonitor()); - /** - * Timestamp marking the end time of {@link #redundancyThread}'s full cycle. - * This value can be checked by the Junit tests to verify that the - * {@link #redundancyThread} has run at least one full iteration. - */ - private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1); + /** StorageInfoDefragmenter thread. */ private final Daemon storageInfoDefragmenterThread = new Daemon(new StorageInfoDefragmenter()); @@ -4800,17 +4794,6 @@ public class BlockManager implements BlockStatsMXBean { return neededReconstruction.size(); } - /** - * Used as ad hoc to check the time stamp of the last full cycle of - * {@link #redundancyThread}. This is used by the Junit tests to block until - * {@link #lastRedundancyCycleTS} is updated. - * @return the current {@link #lastRedundancyCycleTS}. - */ - @VisibleForTesting - public long getLastRedundancyMonitorTS() { - return lastRedundancyCycleTS.get(); - } - /** * Periodically calls computeBlockRecoveryWork(). */ @@ -4825,7 +4808,6 @@ public class BlockManager implements BlockStatsMXBean { computeDatanodeWork(); processPendingReconstructions(); rescanPostponedMisreplicatedBlocks(); - lastRedundancyCycleTS.set(Time.monotonicNow()); } TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); } catch (Throwable t) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e91776dbda4..f6b6c461b7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -90,7 +90,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; @@ -117,6 +116,7 @@ import java.io.DataInput; import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -311,7 +311,6 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; import org.apache.log4j.Logger; import org.apache.log4j.Appender; @@ -490,12 +489,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // A daemon to periodically clean up corrupt lazyPersist files // from the name space. Daemon lazyPersistFileScrubber = null; - /** - * Timestamp marking the end time of {@link #lazyPersistFileScrubber}'s full - * cycle. This value can be checked by the Junit tests to verify that the - * {@link #lazyPersistFileScrubber} has run at least one full iteration. - */ - private final AtomicLong lazyPersistFileScrubberTS = new AtomicLong(0); + // Executor to warm up EDEK cache private ExecutorService edekCacheLoader = null; private final int edekCacheLoaderDelay; @@ -654,20 +648,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return leaseManager; } - /** - * Used as ad hoc to check the time stamp of the last full cycle of {@link - * #lazyPersistFileScrubber} daemon. This is used by the Junit tests to block - * until {@link #lazyPersistFileScrubberTS} is updated. - * - * @return the current {@link #lazyPersistFileScrubberTS} if {@link - * #lazyPersistFileScrubber} is not null. - */ - @VisibleForTesting - public long getLazyPersistFileScrubberTS() { - return lazyPersistFileScrubber == null ? -1 - : lazyPersistFileScrubberTS.get(); - } - public boolean isHaEnabled() { return haEnabled; } @@ -4198,12 +4178,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, try { if (!isInSafeMode()) { clearCorruptLazyPersistFiles(); - // set the timeStamp of last Cycle. - lazyPersistFileScrubberTS.set(Time.monotonicNow()); } else { if (FSNamesystem.LOG.isDebugEnabled()) { - FSNamesystem.LOG.debug("Namenode is in safemode, skipping " - + "scrubbing of corrupted lazy-persist files."); + FSNamesystem.LOG + .debug("Namenode is in safemode, skipping scrubbing of corrupted lazy-persist files."); } } } catch (Exception e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index cf37c7490b3..2068e41830c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -2010,15 +2010,6 @@ public class DFSTestUtil { GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); } - public static void setNameNodeLogLevel(org.slf4j.event.Level level) { - GenericTestUtils.setLogLevel(FSNamesystem.LOG, level); - GenericTestUtils.setLogLevel(BlockManager.LOG, level); - GenericTestUtils.setLogLevel(LeaseManager.LOG, level); - GenericTestUtils.setLogLevel(NameNode.LOG, level); - GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level); - GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); - } - /** * Get the NamenodeProtocol RPC proxy for the NN associated with this * DFSClient object @@ -2301,13 +2292,16 @@ public class DFSTestUtil { public Boolean get() { try { final int currentValue = Integer.parseInt(jmx.getValue(metricName)); + LOG.info("Waiting for " + metricName + + " to reach value " + expectedValue + + ", current value = " + currentValue); return currentValue == expectedValue; } catch (Exception e) { throw new RuntimeException( "Test failed due to unexpected exception", e); } } - }, 50, 60000); + }, 1000, 60000); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java index e3706f0b21e..ece5739f88f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java @@ -17,11 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Supplier; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import static org.apache.hadoop.fs.CreateFlag.CREATE; @@ -47,14 +43,6 @@ import java.util.concurrent.TimeoutException; import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -80,10 +68,10 @@ import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Rule; import org.junit.rules.Timeout; -import org.slf4j.event.Level; public abstract class LazyPersistTestCase { static final byte LAZY_PERSIST_POLICY_ID = (byte) 15; @@ -93,33 +81,17 @@ public abstract class LazyPersistTestCase { GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG); } - protected static final Logger LOG = - LoggerFactory.getLogger(LazyPersistTestCase.class); protected static final int BLOCK_SIZE = 5 * 1024 * 1024; protected static final int BUFFER_LENGTH = 4096; - protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; - protected static final int LAZY_WRITER_INTERVAL_SEC = 1; - protected static final short REPL_FACTOR = 1; + private static final long HEARTBEAT_INTERVAL_SEC = 1; + private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; private static final String JMX_SERVICE_NAME = "DataNode"; - private static final long HEARTBEAT_INTERVAL_SEC = 1; - private static final int HEARTBEAT_RECHECK_INTERVAL_MS = 500; - private static final long WAIT_FOR_FBR_MS = - TimeUnit.SECONDS.toMillis(10); - private static final long WAIT_FOR_STORAGE_TYPES_MS = - TimeUnit.SECONDS.toMillis(30); - private static final long WAIT_FOR_ASYNC_DELETE_MS = - TimeUnit.SECONDS.toMillis(10); - private static final long WAIT_FOR_DN_SHUTDOWN_MS = - TimeUnit.SECONDS.toMillis(30); - private static final long WAIT_FOR_REDUNDANCY_MS = - TimeUnit.SECONDS - .toMillis(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT); - private static final long WAIT_FOR_LAZY_SCRUBBER_MS = - TimeUnit.SECONDS.toMillis(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC); - private static final long WAIT_POLL_INTERVAL_MS = 10; - private static final long WAIT_POLL_INTERVAL_LARGE_MS = 20; - + protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; + protected static final int LAZY_WRITER_INTERVAL_SEC = 1; + protected static final Logger LOG = + LoggerFactory.getLogger(LazyPersistTestCase.class); + protected static final short REPL_FACTOR = 1; protected final long osPageSize = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize(); @@ -162,79 +134,76 @@ public abstract class LazyPersistTestCase { Path path, StorageType storageType) throws IOException, TimeoutException, InterruptedException { // Ensure that returned block locations returned are correct! - LOG.info("Ensure path: {} is on StorageType: {}", path, storageType); + LOG.info("Ensure path: " + path + " is on StorageType: " + storageType); assertThat(fs.exists(path), is(true)); long fileLength = client.getFileInfo(path.toString()).getLen(); - GenericTestUtils.waitFor(() -> { - try { - LocatedBlocks locatedBlocks = - client.getLocatedBlocks(path.toString(), 0, fileLength); - for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { - if (locatedBlock.getStorageTypes()[0] != storageType) { - return false; + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + LocatedBlocks locatedBlocks = + client.getLocatedBlocks(path.toString(), 0, fileLength); + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + if (locatedBlock.getStorageTypes()[0] != storageType) { + return false; + } } + return true; + } catch (IOException ioe) { + LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe); + return false; } - return true; - } catch (IOException ioe) { - LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe); - return false; } - }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS); + }, 100, 30 * 1000); return client.getLocatedBlocks(path.toString(), 0, fileLength); } /** - * Make sure at least one non-transient volume has a saved copy of the - * replica. An infinite loop is used to ensure the async lazy persist tasks - * are completely done before verification. - * Caller of this method expects either a successful pass or timeout failure. - * - * @param locatedBlocks the collection of blocks and their locations. - * @throws IOException for aut-closeable resources. - * @throws InterruptedException if the thread is interrupted. - * @throws TimeoutException if {@link #WAIT_FOR_STORAGE_TYPES_MS} expires - * before we find a persisted copy for each located - * block. + * Make sure at least one non-transient volume has a saved copy of the replica. + * An infinite loop is used to ensure the async lazy persist tasks are completely + * done before verification. Caller of ensureLazyPersistBlocksAreSaved expects + * either a successful pass or timeout failure. */ protected final void ensureLazyPersistBlocksAreSaved( - final LocatedBlocks locatedBlocks) - throws IOException, InterruptedException, TimeoutException { + LocatedBlocks locatedBlocks) throws IOException, InterruptedException { final String bpid = cluster.getNamesystem().getBlockPoolId(); final Set persistedBlockIds = new HashSet(); - // We should find a persisted copy for each located block. + try (FsDatasetSpi.FsVolumeReferences volumes = cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) { - GenericTestUtils.waitFor(() -> { + while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks() + .size()) { + // Take 1 second sleep before each verification iteration + Thread.sleep(1000); + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { for (FsVolumeSpi v : volumes) { if (v.isTransientStorage()) { continue; } + FsVolumeImpl volume = (FsVolumeImpl) v; - File lazyPersistDir; - try { - lazyPersistDir = - volume.getBlockPoolSlice(bpid).getLazypersistDir(); - } catch (IOException ioe) { - return false; - } + File lazyPersistDir = + volume.getBlockPoolSlice(bpid).getLazypersistDir(); + long blockId = lb.getBlock().getBlockId(); File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, blockId); File blockFile = new File(targetDir, lb.getBlock().getBlockName()); if (blockFile.exists()) { - // Found a persisted copy for this block and added to the Set. + // Found a persisted copy for this block and added to the Set persistedBlockIds.add(blockId); } } } - return (persistedBlockIds.size() == - locatedBlocks.getLocatedBlocks().size()); - }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_STORAGE_TYPES_MS); + } } + + // We should have found a persisted copy for each located block. + assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size())); } protected final void makeRandomTestFile(Path path, long length, @@ -303,7 +272,7 @@ public abstract class LazyPersistTestCase { } conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, - HEARTBEAT_RECHECK_INTERVAL_MS); + HEARTBEAT_RECHECK_INTERVAL_MSEC); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, LAZY_WRITER_INTERVAL_SEC); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1); @@ -366,18 +335,18 @@ public abstract class LazyPersistTestCase { @Override public void mlock(String identifier, ByteBuffer mmap, long length) throws IOException { - LOG.info("LazyPersistTestCase: faking mlock of {} bytes.", identifier); + LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes."); } @Override public long getMemlockLimit() { - LOG.info("LazyPersistTestCase: fake return {}", Long.MAX_VALUE); + LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE); return Long.MAX_VALUE; } @Override public boolean verifyCanMlock() { - LOG.info("LazyPersistTestCase: fake return {}", true); + LOG.info("LazyPersistTestCase: fake return " + true); return true; } }); @@ -445,10 +414,8 @@ public abstract class LazyPersistTestCase { public void build() throws IOException { LazyPersistTestCase.this.startUpCluster( - numDatanodes, hasTransientStorage, storageTypes, - ramDiskReplicaCapacity, - ramDiskStorageLimit, maxLockedMemory, useScr, - useLegacyBlockReaderLocal, + numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity, + ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber); } @@ -463,44 +430,11 @@ public abstract class LazyPersistTestCase { private boolean disableScrubber=false; } - /** - * Forces a full blockreport on all the datatanodes. The call blocks waiting - * for all blockreports to be received by the namenode. - * - * @throws IOException if an exception is thrown while getting the datanode - * descriptors or triggering the blockreports. - * @throws InterruptedException if the thread receives an interrupt. - * @throws TimeoutException if the reports are not received by - * {@link #WAIT_FOR_FBR_MS}. - */ protected final void triggerBlockReport() - throws InterruptedException, TimeoutException, IOException { + throws IOException, InterruptedException { // Trigger block report to NN - final Map reportCountsBefore = - new HashMap<>(); - final FSNamesystem fsn = cluster.getNamesystem(); - for (DataNode dn : cluster.getDataNodes()) { - final DatanodeDescriptor dnd = - NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); - final DatanodeStorageInfo storage = dnd.getStorageInfos()[0]; - reportCountsBefore.put(storage, storage.getBlockReportCount()); - DataNodeTestUtils.triggerBlockReport(dn); - } - // wait for block reports to be received. - GenericTestUtils.waitFor(() -> { - for (Entry reportEntry : - reportCountsBefore.entrySet()) { - final DatanodeStorageInfo dnStorageInfo = reportEntry.getKey(); - final int cntBefore = reportEntry.getValue(); - final int currentCnt = dnStorageInfo.getBlockReportCount(); - if (cntBefore == currentCnt) { - // Same count means no report has been received. - return false; - } - } - // If we reach here, then all the block reports have been received. - return true; - }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS); + DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); + Thread.sleep(10 * 1000); } protected final boolean verifyBlockDeletedFromDir(File dir, @@ -512,58 +446,51 @@ public abstract class LazyPersistTestCase { File blockFile = new File(targetDir, lb.getBlock().getBlockName()); if (blockFile.exists()) { + LOG.warn("blockFile: " + blockFile.getAbsolutePath() + + " exists after deletion."); return false; } File metaFile = new File(targetDir, DatanodeUtil.getMetaName(lb.getBlock().getBlockName(), lb.getBlock().getGenerationStamp())); if (metaFile.exists()) { + LOG.warn("metaFile: " + metaFile.getAbsolutePath() + + " exists after deletion."); return false; } } return true; } - protected final boolean verifyDeletedBlocks(final LocatedBlocks locatedBlocks) - throws Exception { + protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks) + throws IOException, InterruptedException { LOG.info("Verifying replica has no saved copy after deletion."); triggerBlockReport(); - final DataNode dn = cluster.getDataNodes().get(0); - GenericTestUtils.waitFor(() -> { - for (DataNode dn1 : cluster.getDataNodes()) { - if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions() - > 0) { + while( + cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions() + > 0L){ + Thread.sleep(1000); + } + + final String bpid = cluster.getNamesystem().getBlockPoolId(); + final FsDatasetSpi dataset = + cluster.getDataNodes().get(0).getFSDataset(); + + // Make sure deleted replica does not have a copy on either finalized dir of + // transient volume or finalized dir of non-transient volume + try (FsDatasetSpi.FsVolumeReferences volumes = + dataset.getFsVolumeReferences()) { + for (FsVolumeSpi vol : volumes) { + FsVolumeImpl volume = (FsVolumeImpl) vol; + File targetDir = (volume.isTransientStorage()) ? + volume.getBlockPoolSlice(bpid).getFinalizedDir() : + volume.getBlockPoolSlice(bpid).getLazypersistDir(); + if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) { return false; } } - return true; - }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS); - - final String bpid = cluster.getNamesystem().getBlockPoolId(); - final FsDatasetSpi dataset = dn.getFSDataset(); - // Make sure deleted replica does not have a copy on either finalized dir of - // transient volume or finalized dir of non-transient volume. - // We need to wait until the asyn deletion is scheduled. - try (FsDatasetSpi.FsVolumeReferences volumes = - dataset.getFsVolumeReferences()) { - GenericTestUtils.waitFor(() -> { - try { - for (FsVolumeSpi vol : volumes) { - FsVolumeImpl volume = (FsVolumeImpl) vol; - File targetDir = (volume.isTransientStorage()) ? - volume.getBlockPoolSlice(bpid).getFinalizedDir() : - volume.getBlockPoolSlice(bpid).getLazypersistDir(); - if (!verifyBlockDeletedFromDir(targetDir, locatedBlocks)) { - return false; - } - } - return true; - } catch (IOException ie) { - return false; - } - }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS); } return true; } @@ -604,137 +531,8 @@ public abstract class LazyPersistTestCase { DFSTestUtil.waitForMetric(jmx, metricName, expectedValue); } - protected void triggerEviction(final DataNode dn) { + protected void triggerEviction(DataNode dn) { FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset(); fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle. } - - /** - * Shutdown all datanodes in {@link #cluster}. The call blocks for - * {@link #WAIT_FOR_DN_SHUTDOWN_MS} until client report has no datanode - * labeled as live. - * - * @throws TimeoutException if {@link #WAIT_FOR_DN_SHUTDOWN_MS} expires with - * at least one datanode still alive. - * @throws InterruptedException if the thread receives an interrupt. - */ - protected void shutdownDataNodes() - throws TimeoutException, InterruptedException { - cluster.shutdownDataNodes(); - GenericTestUtils.waitFor(() -> { - try { - DatanodeInfo[] info = client.datanodeReport( - HdfsConstants.DatanodeReportType.LIVE); - return info.length == 0; - } catch (IOException e) { - return false; - } - }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS); - } - - /** - * Blocks for {@link #WAIT_FOR_REDUNDANCY_MS} waiting for corrupt block count - * to reach a certain count. - * - * @param corruptCnt representing the number of corrupt blocks before - * resuming. - * @throws TimeoutException if {@link #WAIT_FOR_REDUNDANCY_MS} expires with - * corrupt count does not meet the criteria. - * @throws InterruptedException if the thread receives an interrupt. - */ - protected void waitForCorruptBlock(final long corruptCnt) - throws TimeoutException, InterruptedException { - // wait for the redundancy monitor to mark the file as corrupt. - GenericTestUtils.waitFor(() -> { - Iterator bInfoIter = cluster.getNameNode() - .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator(); - int count = 0; - while (bInfoIter.hasNext()) { - bInfoIter.next(); - count++; - } - return corruptCnt == count; - }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS); - } - - /** - * Blocks until {@link FSNamesystem#lazyPersistFileScrubber} daemon completes - * a full iteration. - * - * @throws InterruptedException if the thread receives an interrupt. - * @throws TimeoutException - * {@link FSNamesystem#getLazyPersistFileScrubberTS()} - * does not update the timestamp by - * {@link #WAIT_FOR_LAZY_SCRUBBER_MS}. - */ - protected void waitForScrubberCycle() - throws TimeoutException, InterruptedException { - // wait for the redundancy monitor to mark the file as corrupt. - final FSNamesystem fsn = cluster.getNamesystem(); - final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS(); - if (lastTimeStamp == -1) { // scrubber is disabled - return; - } - GenericTestUtils.waitFor( - () -> lastTimeStamp != fsn.getLazyPersistFileScrubberTS(), - 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS); - } - - /** - * Blocks until {@link BlockManager#RedundancyMonitor} daemon completes - * a full iteration. - * - * @throws InterruptedException if the thread receives an interrupt. - * @throws TimeoutException {@link BlockManager#getLastRedundancyMonitorTS()} - * does not update the timestamp by - * {@link #WAIT_FOR_REDUNDANCY_MS}. - */ - protected void waitForRedundancyMonitorCycle() - throws TimeoutException, InterruptedException { - // wait for the redundancy monitor to mark the file as corrupt. - final BlockManager bm = cluster.getNamesystem().getBlockManager(); - final long lastRedundancyTS = - bm.getLastRedundancyMonitorTS(); - - GenericTestUtils.waitFor( - () -> lastRedundancyTS != bm.getLastRedundancyMonitorTS(), - 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS); - } - - /** - * Blocks until {@link BlockManager#lowRedundancyBlocksCount} reaches a - * certain value. - * - * @throws InterruptedException if the thread receives an interrupt. - * @throws TimeoutException {@link BlockManager#getLowRedundancyBlocksCount()} - * does not update the count by - * {@link #WAIT_FOR_REDUNDANCY_MS}. - */ - protected void waitForLowRedundancyCount(final long cnt) - throws TimeoutException, InterruptedException { - final BlockManager bm = cluster.getNamesystem().getBlockManager(); - - GenericTestUtils.waitFor(() -> cnt == bm.getLowRedundancyBlocksCount(), - 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS); - } - - /** - * Blocks until the file status changes on the filesystem. - * - * @param path of the file to be checked. - * @param expected whether a file should exist or not. - * @throws TimeoutException if the file status does not meet the expected by - * {@link #WAIT_FOR_STORAGE_TYPES_MS}. - * @throws InterruptedException if the thread receives an interrupt. - */ - protected void waitForFile(final Path path, final boolean expected) - throws TimeoutException, InterruptedException { - GenericTestUtils.waitFor(() -> { - try { - return expected == fs.exists(path); - } catch (IOException e) { - return false; - } - }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index c0b4b17ea95..04f81278834 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -16,10 +16,11 @@ * limitations under the License. */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import com.google.common.collect.Iterators; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.Path; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.ThreadUtil; import org.junit.Assert; import org.junit.Test; @@ -32,6 +33,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.fs.StorageType.RAM_DISK; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -74,7 +76,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { makeTestFile(path, BLOCK_SIZE, true); try { - client.truncate(path.toString(), BLOCK_SIZE / 2); + client.truncate(path.toString(), BLOCK_SIZE/2); fail("Truncate to LazyPersist file did not fail as expected"); } catch (Throwable t) { LOG.info("Got expected exception ", t); @@ -96,20 +98,28 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); - // Stop the DataNode. - shutdownDataNodes(); + // Stop the DataNode and sleep for the time it takes the NN to + // detect the DN as being dead. + cluster.shutdownDataNodes(); + Thread.sleep(30000L); assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1)); // Next, wait for the redundancy monitor to mark the file as corrupt. - waitForRedundancyMonitorCycle(); + Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000); + // Wait for the LazyPersistFileScrubber to run - waitForScrubberCycle(); + Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000); + // Ensure that path1 does not exist anymore, whereas path2 does. - waitForFile(path1, false); + assert(!fs.exists(path1)); // We should have zero blocks that needs replication i.e. the one - // belonging to path2. This needs a wait. - waitForLowRedundancyCount(0L); + // belonging to path2. + assertThat(cluster.getNameNode() + .getNamesystem() + .getBlockManager() + .getLowRedundancyBlocksCount(), + is(0L)); } @Test @@ -124,14 +134,18 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { // Stop the DataNode and sleep for the time it takes the NN to // detect the DN as being dead. - shutdownDataNodes(); + cluster.shutdownDataNodes(); + Thread.sleep(30000L); + + // Next, wait for the redundancy monitor to mark the file as corrupt. + Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000); - // wait for the redundancy monitor to mark the file as corrupt. - waitForCorruptBlock(1L); // Wait for the LazyPersistFileScrubber to run - waitForScrubberCycle(); + Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000); + // Ensure that path1 exist. - waitForFile(path1, true); + Assert.assertTrue(fs.exists(path1)); + } /** @@ -146,14 +160,20 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); - shutdownDataNodes(); + cluster.shutdownDataNodes(); cluster.restartNameNodes(); // wait for the redundancy monitor to mark the file as corrupt. - waitForCorruptBlock(1L); + Long corruptBlkCount; + do { + Thread.sleep(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000); + corruptBlkCount = (long) Iterators.size(cluster.getNameNode() + .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator()); + } while (corruptBlkCount != 1L); + // Ensure path1 exist. - waitForFile(path1, true); + Assert.assertTrue(fs.exists(path1)); } /** @@ -195,8 +215,10 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { threads[i].start(); } + Thread.sleep(500); + for (int i = 0; i < NUM_TASKS; i++) { - ThreadUtil.joinUninterruptibly(threads[i]); + Uninterruptibles.joinUninterruptibly(threads[i]); } Assert.assertFalse(testFailed.get()); } @@ -210,7 +232,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { */ @Test public void testConcurrentWrites() - throws IOException, InterruptedException, TimeoutException { + throws IOException, InterruptedException { getClusterBuilder().setRamDiskReplicaCapacity(9).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); final int SEED = 0xFADED; @@ -259,11 +281,11 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { this.seed = seed; this.latch = latch; this.bFail = bFail; - LOG.info("Creating Writer: {}", id); + System.out.println("Creating Writer: " + id); } public void run() { - LOG.info("Writer {} starting... ", id); + System.out.println("Writer " + id + " starting... "); int i = 0; try { for (i = 0; i < paths.length; i++) { @@ -273,8 +295,9 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { } } catch (IOException e) { bFail.set(true); - LOG.error("Writer exception: writer id:{} testfile: {}", - id, paths[i].toString(), e); + LOG.error("Writer exception: writer id:" + id + + " testfile: " + paths[i].toString() + + " " + e); } finally { latch.countDown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java index b6413ec6246..c16dbe5604d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java @@ -119,7 +119,7 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase { */ @Test public void testFallbackToDiskPartial() - throws IOException, InterruptedException, TimeoutException { + throws IOException, InterruptedException { getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java index 56cc41e37fe..16807640fd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -157,6 +156,7 @@ public class TestLazyWriter extends LazyPersistTestCase { for (int i = 0; i < NUM_PATHS; ++i) { makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true); triggerBlockReport(); + Thread.sleep(3000); ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK); ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT); for (int j = i + 1; j < NUM_PATHS; ++j) { @@ -183,13 +183,13 @@ public class TestLazyWriter extends LazyPersistTestCase { throws Exception { getClusterBuilder().build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); - final DataNode dn = cluster.getDataNodes().get(0); - FsDatasetTestUtil.stopLazyWriter(dn); + FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); + // Delete before persist client.delete(path.toString(), false); Assert.assertFalse(fs.exists(path));