Revert "HADOOP-16888. [JDK11] Support JDK11 in the precommit job. Contributed by"

Incorrect commit message

This reverts commit 749d7c0027.
This commit is contained in:
Kihwal Lee 2020-02-27 09:16:55 -06:00
parent 749d7c0027
commit de7edf58bd
9 changed files with 146 additions and 402 deletions

View File

@ -49,34 +49,6 @@ public class ThreadUtil {
} }
} }
/**
* Join a thread as uninterruptible.
* The call continues to block until the result is available even when the
* caller thread is interrupted.
* The method will log any {@link InterruptedException} then will re-interrupt
* the thread.
*
* @param toJoin the thread to Join on.
*/
public static void joinUninterruptibly(Thread toJoin) {
boolean interrupted = false;
try {
while (true) {
try {
toJoin.join();
return;
} catch (InterruptedException e) {
interrupted = true;
LOG.warn("interrupted while sleeping", e);
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/** /**
* Convenience method that returns a resource as inputstream from the * Convenience method that returns a resource as inputstream from the
* classpath. * classpath.

View File

@ -61,6 +61,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -377,15 +378,11 @@ public abstract class GenericTestUtils {
* time * time
* @throws InterruptedException if the method is interrupted while waiting * @throws InterruptedException if the method is interrupted while waiting
*/ */
public static void waitFor(final Supplier<Boolean> check, public static void waitFor(Supplier<Boolean> check, int checkEveryMillis,
final long checkEveryMillis, final long waitForMillis) int waitForMillis) throws TimeoutException, InterruptedException {
throws TimeoutException, InterruptedException { Preconditions.checkNotNull(check, ERROR_MISSING_ARGUMENT);
if (check == null) { Preconditions.checkArgument(waitForMillis >= checkEveryMillis,
throw new NullPointerException(ERROR_MISSING_ARGUMENT); ERROR_INVALID_ARGUMENT);
}
if (waitForMillis < checkEveryMillis) {
throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
}
long st = Time.monotonicNow(); long st = Time.monotonicNow();
boolean result = check.get(); boolean result = check.get();

View File

@ -49,7 +49,6 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -325,12 +324,7 @@ public class BlockManager implements BlockStatsMXBean {
/** Redundancy thread. */ /** Redundancy thread. */
private final Daemon redundancyThread = new Daemon(new RedundancyMonitor()); private final Daemon redundancyThread = new Daemon(new RedundancyMonitor());
/**
* Timestamp marking the end time of {@link #redundancyThread}'s full cycle.
* This value can be checked by the Junit tests to verify that the
* {@link #redundancyThread} has run at least one full iteration.
*/
private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
/** StorageInfoDefragmenter thread. */ /** StorageInfoDefragmenter thread. */
private final Daemon storageInfoDefragmenterThread = private final Daemon storageInfoDefragmenterThread =
new Daemon(new StorageInfoDefragmenter()); new Daemon(new StorageInfoDefragmenter());
@ -4800,17 +4794,6 @@ public class BlockManager implements BlockStatsMXBean {
return neededReconstruction.size(); return neededReconstruction.size();
} }
/**
* Used as ad hoc to check the time stamp of the last full cycle of
* {@link #redundancyThread}. This is used by the Junit tests to block until
* {@link #lastRedundancyCycleTS} is updated.
* @return the current {@link #lastRedundancyCycleTS}.
*/
@VisibleForTesting
public long getLastRedundancyMonitorTS() {
return lastRedundancyCycleTS.get();
}
/** /**
* Periodically calls computeBlockRecoveryWork(). * Periodically calls computeBlockRecoveryWork().
*/ */
@ -4825,7 +4808,6 @@ public class BlockManager implements BlockStatsMXBean {
computeDatanodeWork(); computeDatanodeWork();
processPendingReconstructions(); processPendingReconstructions();
rescanPostponedMisreplicatedBlocks(); rescanPostponedMisreplicatedBlocks();
lastRedundancyCycleTS.set(Time.monotonicNow());
} }
TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -90,7 +90,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
@ -117,6 +116,7 @@ import java.io.DataInput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.PrintWriter; import java.io.PrintWriter;
@ -311,7 +311,6 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.Appender; import org.apache.log4j.Appender;
@ -490,12 +489,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// A daemon to periodically clean up corrupt lazyPersist files // A daemon to periodically clean up corrupt lazyPersist files
// from the name space. // from the name space.
Daemon lazyPersistFileScrubber = null; Daemon lazyPersistFileScrubber = null;
/**
* Timestamp marking the end time of {@link #lazyPersistFileScrubber}'s full
* cycle. This value can be checked by the Junit tests to verify that the
* {@link #lazyPersistFileScrubber} has run at least one full iteration.
*/
private final AtomicLong lazyPersistFileScrubberTS = new AtomicLong(0);
// Executor to warm up EDEK cache // Executor to warm up EDEK cache
private ExecutorService edekCacheLoader = null; private ExecutorService edekCacheLoader = null;
private final int edekCacheLoaderDelay; private final int edekCacheLoaderDelay;
@ -654,20 +648,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return leaseManager; return leaseManager;
} }
/**
* Used as ad hoc to check the time stamp of the last full cycle of {@link
* #lazyPersistFileScrubber} daemon. This is used by the Junit tests to block
* until {@link #lazyPersistFileScrubberTS} is updated.
*
* @return the current {@link #lazyPersistFileScrubberTS} if {@link
* #lazyPersistFileScrubber} is not null.
*/
@VisibleForTesting
public long getLazyPersistFileScrubberTS() {
return lazyPersistFileScrubber == null ? -1
: lazyPersistFileScrubberTS.get();
}
public boolean isHaEnabled() { public boolean isHaEnabled() {
return haEnabled; return haEnabled;
} }
@ -4198,12 +4178,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try { try {
if (!isInSafeMode()) { if (!isInSafeMode()) {
clearCorruptLazyPersistFiles(); clearCorruptLazyPersistFiles();
// set the timeStamp of last Cycle.
lazyPersistFileScrubberTS.set(Time.monotonicNow());
} else { } else {
if (FSNamesystem.LOG.isDebugEnabled()) { if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Namenode is in safemode, skipping " FSNamesystem.LOG
+ "scrubbing of corrupted lazy-persist files."); .debug("Namenode is in safemode, skipping scrubbing of corrupted lazy-persist files.");
} }
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -2010,15 +2010,6 @@ public class DFSTestUtil {
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
} }
public static void setNameNodeLogLevel(org.slf4j.event.Level level) {
GenericTestUtils.setLogLevel(FSNamesystem.LOG, level);
GenericTestUtils.setLogLevel(BlockManager.LOG, level);
GenericTestUtils.setLogLevel(LeaseManager.LOG, level);
GenericTestUtils.setLogLevel(NameNode.LOG, level);
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
}
/** /**
* Get the NamenodeProtocol RPC proxy for the NN associated with this * Get the NamenodeProtocol RPC proxy for the NN associated with this
* DFSClient object * DFSClient object
@ -2301,13 +2292,16 @@ public class DFSTestUtil {
public Boolean get() { public Boolean get() {
try { try {
final int currentValue = Integer.parseInt(jmx.getValue(metricName)); final int currentValue = Integer.parseInt(jmx.getValue(metricName));
LOG.info("Waiting for " + metricName +
" to reach value " + expectedValue +
", current value = " + currentValue);
return currentValue == expectedValue; return currentValue == expectedValue;
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException( throw new RuntimeException(
"Test failed due to unexpected exception", e); "Test failed due to unexpected exception", e);
} }
} }
}, 50, 60000); }, 1000, 60000);
} }
/** /**

View File

@ -17,11 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.HashMap; import com.google.common.base.Supplier;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.CREATE;
@ -47,14 +43,6 @@ import java.util.concurrent.TimeoutException;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -80,10 +68,10 @@ import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Rule; import org.junit.Rule;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.slf4j.event.Level;
public abstract class LazyPersistTestCase { public abstract class LazyPersistTestCase {
static final byte LAZY_PERSIST_POLICY_ID = (byte) 15; static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
@ -93,33 +81,17 @@ public abstract class LazyPersistTestCase {
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
} }
protected static final Logger LOG =
LoggerFactory.getLogger(LazyPersistTestCase.class);
protected static final int BLOCK_SIZE = 5 * 1024 * 1024; protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
protected static final int BUFFER_LENGTH = 4096; protected static final int BUFFER_LENGTH = 4096;
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final long HEARTBEAT_INTERVAL_SEC = 1;
protected static final int LAZY_WRITER_INTERVAL_SEC = 1; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
protected static final short REPL_FACTOR = 1;
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
private static final String JMX_SERVICE_NAME = "DataNode"; private static final String JMX_SERVICE_NAME = "DataNode";
private static final long HEARTBEAT_INTERVAL_SEC = 1; protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final int HEARTBEAT_RECHECK_INTERVAL_MS = 500; protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final long WAIT_FOR_FBR_MS = protected static final Logger LOG =
TimeUnit.SECONDS.toMillis(10); LoggerFactory.getLogger(LazyPersistTestCase.class);
private static final long WAIT_FOR_STORAGE_TYPES_MS = protected static final short REPL_FACTOR = 1;
TimeUnit.SECONDS.toMillis(30);
private static final long WAIT_FOR_ASYNC_DELETE_MS =
TimeUnit.SECONDS.toMillis(10);
private static final long WAIT_FOR_DN_SHUTDOWN_MS =
TimeUnit.SECONDS.toMillis(30);
private static final long WAIT_FOR_REDUNDANCY_MS =
TimeUnit.SECONDS
.toMillis(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT);
private static final long WAIT_FOR_LAZY_SCRUBBER_MS =
TimeUnit.SECONDS.toMillis(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
private static final long WAIT_POLL_INTERVAL_MS = 10;
private static final long WAIT_POLL_INTERVAL_LARGE_MS = 20;
protected final long osPageSize = protected final long osPageSize =
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize(); NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
@ -162,11 +134,13 @@ public abstract class LazyPersistTestCase {
Path path, StorageType storageType) Path path, StorageType storageType)
throws IOException, TimeoutException, InterruptedException { throws IOException, TimeoutException, InterruptedException {
// Ensure that returned block locations returned are correct! // Ensure that returned block locations returned are correct!
LOG.info("Ensure path: {} is on StorageType: {}", path, storageType); LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
assertThat(fs.exists(path), is(true)); assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen(); long fileLength = client.getFileInfo(path.toString()).getLen();
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try { try {
LocatedBlocks locatedBlocks = LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength); client.getLocatedBlocks(path.toString(), 0, fileLength);
@ -180,63 +154,58 @@ public abstract class LazyPersistTestCase {
LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe); LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe);
return false; return false;
} }
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS); }
}, 100, 30 * 1000);
return client.getLocatedBlocks(path.toString(), 0, fileLength); return client.getLocatedBlocks(path.toString(), 0, fileLength);
} }
/** /**
* Make sure at least one non-transient volume has a saved copy of the * Make sure at least one non-transient volume has a saved copy of the replica.
* replica. An infinite loop is used to ensure the async lazy persist tasks * An infinite loop is used to ensure the async lazy persist tasks are completely
* are completely done before verification. * done before verification. Caller of ensureLazyPersistBlocksAreSaved expects
* Caller of this method expects either a successful pass or timeout failure. * either a successful pass or timeout failure.
*
* @param locatedBlocks the collection of blocks and their locations.
* @throws IOException for aut-closeable resources.
* @throws InterruptedException if the thread is interrupted.
* @throws TimeoutException if {@link #WAIT_FOR_STORAGE_TYPES_MS} expires
* before we find a persisted copy for each located
* block.
*/ */
protected final void ensureLazyPersistBlocksAreSaved( protected final void ensureLazyPersistBlocksAreSaved(
final LocatedBlocks locatedBlocks) LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
throws IOException, InterruptedException, TimeoutException {
final String bpid = cluster.getNamesystem().getBlockPoolId(); final String bpid = cluster.getNamesystem().getBlockPoolId();
final Set<Long> persistedBlockIds = new HashSet<Long>(); final Set<Long> persistedBlockIds = new HashSet<Long>();
// We should find a persisted copy for each located block.
try (FsDatasetSpi.FsVolumeReferences volumes = try (FsDatasetSpi.FsVolumeReferences volumes =
cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) { cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
GenericTestUtils.waitFor(() -> { while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks()
.size()) {
// Take 1 second sleep before each verification iteration
Thread.sleep(1000);
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
for (FsVolumeSpi v : volumes) { for (FsVolumeSpi v : volumes) {
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
continue; continue;
} }
FsVolumeImpl volume = (FsVolumeImpl) v; FsVolumeImpl volume = (FsVolumeImpl) v;
File lazyPersistDir; File lazyPersistDir =
try {
lazyPersistDir =
volume.getBlockPoolSlice(bpid).getLazypersistDir(); volume.getBlockPoolSlice(bpid).getLazypersistDir();
} catch (IOException ioe) {
return false;
}
long blockId = lb.getBlock().getBlockId(); long blockId = lb.getBlock().getBlockId();
File targetDir = File targetDir =
DatanodeUtil.idToBlockDir(lazyPersistDir, blockId); DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
File blockFile = new File(targetDir, lb.getBlock().getBlockName()); File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) { if (blockFile.exists()) {
// Found a persisted copy for this block and added to the Set. // Found a persisted copy for this block and added to the Set
persistedBlockIds.add(blockId); persistedBlockIds.add(blockId);
} }
} }
} }
return (persistedBlockIds.size() ==
locatedBlocks.getLocatedBlocks().size());
}, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_STORAGE_TYPES_MS);
} }
} }
// We should have found a persisted copy for each located block.
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
}
protected final void makeRandomTestFile(Path path, long length, protected final void makeRandomTestFile(Path path, long length,
boolean isLazyPersist, long seed) throws IOException { boolean isLazyPersist, long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length, DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
@ -303,7 +272,7 @@ public abstract class LazyPersistTestCase {
} }
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
HEARTBEAT_RECHECK_INTERVAL_MS); HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC); LAZY_WRITER_INTERVAL_SEC);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
@ -366,18 +335,18 @@ public abstract class LazyPersistTestCase {
@Override @Override
public void mlock(String identifier, public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException { ByteBuffer mmap, long length) throws IOException {
LOG.info("LazyPersistTestCase: faking mlock of {} bytes.", identifier); LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes.");
} }
@Override @Override
public long getMemlockLimit() { public long getMemlockLimit() {
LOG.info("LazyPersistTestCase: fake return {}", Long.MAX_VALUE); LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE);
return Long.MAX_VALUE; return Long.MAX_VALUE;
} }
@Override @Override
public boolean verifyCanMlock() { public boolean verifyCanMlock() {
LOG.info("LazyPersistTestCase: fake return {}", true); LOG.info("LazyPersistTestCase: fake return " + true);
return true; return true;
} }
}); });
@ -445,10 +414,8 @@ public abstract class LazyPersistTestCase {
public void build() throws IOException { public void build() throws IOException {
LazyPersistTestCase.this.startUpCluster( LazyPersistTestCase.this.startUpCluster(
numDatanodes, hasTransientStorage, storageTypes, numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
ramDiskReplicaCapacity, ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal,
ramDiskStorageLimit, maxLockedMemory, useScr,
useLegacyBlockReaderLocal,
disableScrubber); disableScrubber);
} }
@ -463,44 +430,11 @@ public abstract class LazyPersistTestCase {
private boolean disableScrubber=false; private boolean disableScrubber=false;
} }
/**
* Forces a full blockreport on all the datatanodes. The call blocks waiting
* for all blockreports to be received by the namenode.
*
* @throws IOException if an exception is thrown while getting the datanode
* descriptors or triggering the blockreports.
* @throws InterruptedException if the thread receives an interrupt.
* @throws TimeoutException if the reports are not received by
* {@link #WAIT_FOR_FBR_MS}.
*/
protected final void triggerBlockReport() protected final void triggerBlockReport()
throws InterruptedException, TimeoutException, IOException { throws IOException, InterruptedException {
// Trigger block report to NN // Trigger block report to NN
final Map<DatanodeStorageInfo, Integer> reportCountsBefore = DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
new HashMap<>(); Thread.sleep(10 * 1000);
final FSNamesystem fsn = cluster.getNamesystem();
for (DataNode dn : cluster.getDataNodes()) {
final DatanodeDescriptor dnd =
NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
reportCountsBefore.put(storage, storage.getBlockReportCount());
DataNodeTestUtils.triggerBlockReport(dn);
}
// wait for block reports to be received.
GenericTestUtils.waitFor(() -> {
for (Entry<DatanodeStorageInfo, Integer> reportEntry :
reportCountsBefore.entrySet()) {
final DatanodeStorageInfo dnStorageInfo = reportEntry.getKey();
final int cntBefore = reportEntry.getValue();
final int currentCnt = dnStorageInfo.getBlockReportCount();
if (cntBefore == currentCnt) {
// Same count means no report has been received.
return false;
}
}
// If we reach here, then all the block reports have been received.
return true;
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS);
} }
protected final boolean verifyBlockDeletedFromDir(File dir, protected final boolean verifyBlockDeletedFromDir(File dir,
@ -512,58 +446,51 @@ public abstract class LazyPersistTestCase {
File blockFile = new File(targetDir, lb.getBlock().getBlockName()); File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) { if (blockFile.exists()) {
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
" exists after deletion.");
return false; return false;
} }
File metaFile = new File(targetDir, File metaFile = new File(targetDir,
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(), DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
lb.getBlock().getGenerationStamp())); lb.getBlock().getGenerationStamp()));
if (metaFile.exists()) { if (metaFile.exists()) {
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
" exists after deletion.");
return false; return false;
} }
} }
return true; return true;
} }
protected final boolean verifyDeletedBlocks(final LocatedBlocks locatedBlocks) protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
throws Exception { throws IOException, InterruptedException {
LOG.info("Verifying replica has no saved copy after deletion."); LOG.info("Verifying replica has no saved copy after deletion.");
triggerBlockReport(); triggerBlockReport();
final DataNode dn = cluster.getDataNodes().get(0);
GenericTestUtils.waitFor(() -> { while(
for (DataNode dn1 : cluster.getDataNodes()) { cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions()
if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions() > 0L){
> 0) { Thread.sleep(1000);
return false;
} }
}
return true;
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
final String bpid = cluster.getNamesystem().getBlockPoolId(); final String bpid = cluster.getNamesystem().getBlockPoolId();
final FsDatasetSpi<?> dataset = dn.getFSDataset(); final FsDatasetSpi<?> dataset =
cluster.getDataNodes().get(0).getFSDataset();
// Make sure deleted replica does not have a copy on either finalized dir of // Make sure deleted replica does not have a copy on either finalized dir of
// transient volume or finalized dir of non-transient volume. // transient volume or finalized dir of non-transient volume
// We need to wait until the asyn deletion is scheduled.
try (FsDatasetSpi.FsVolumeReferences volumes = try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) { dataset.getFsVolumeReferences()) {
GenericTestUtils.waitFor(() -> {
try {
for (FsVolumeSpi vol : volumes) { for (FsVolumeSpi vol : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) vol; FsVolumeImpl volume = (FsVolumeImpl) vol;
File targetDir = (volume.isTransientStorage()) ? File targetDir = (volume.isTransientStorage()) ?
volume.getBlockPoolSlice(bpid).getFinalizedDir() : volume.getBlockPoolSlice(bpid).getFinalizedDir() :
volume.getBlockPoolSlice(bpid).getLazypersistDir(); volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (!verifyBlockDeletedFromDir(targetDir, locatedBlocks)) { if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false; return false;
} }
} }
return true;
} catch (IOException ie) {
return false;
}
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
} }
return true; return true;
} }
@ -604,137 +531,8 @@ public abstract class LazyPersistTestCase {
DFSTestUtil.waitForMetric(jmx, metricName, expectedValue); DFSTestUtil.waitForMetric(jmx, metricName, expectedValue);
} }
protected void triggerEviction(final DataNode dn) { protected void triggerEviction(DataNode dn) {
FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset(); FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle. fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
} }
/**
* Shutdown all datanodes in {@link #cluster}. The call blocks for
* {@link #WAIT_FOR_DN_SHUTDOWN_MS} until client report has no datanode
* labeled as live.
*
* @throws TimeoutException if {@link #WAIT_FOR_DN_SHUTDOWN_MS} expires with
* at least one datanode still alive.
* @throws InterruptedException if the thread receives an interrupt.
*/
protected void shutdownDataNodes()
throws TimeoutException, InterruptedException {
cluster.shutdownDataNodes();
GenericTestUtils.waitFor(() -> {
try {
DatanodeInfo[] info = client.datanodeReport(
HdfsConstants.DatanodeReportType.LIVE);
return info.length == 0;
} catch (IOException e) {
return false;
}
}, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS);
}
/**
* Blocks for {@link #WAIT_FOR_REDUNDANCY_MS} waiting for corrupt block count
* to reach a certain count.
*
* @param corruptCnt representing the number of corrupt blocks before
* resuming.
* @throws TimeoutException if {@link #WAIT_FOR_REDUNDANCY_MS} expires with
* corrupt count does not meet the criteria.
* @throws InterruptedException if the thread receives an interrupt.
*/
protected void waitForCorruptBlock(final long corruptCnt)
throws TimeoutException, InterruptedException {
// wait for the redundancy monitor to mark the file as corrupt.
GenericTestUtils.waitFor(() -> {
Iterator<BlockInfo> bInfoIter = cluster.getNameNode()
.getNamesystem().getBlockManager().getCorruptReplicaBlockIterator();
int count = 0;
while (bInfoIter.hasNext()) {
bInfoIter.next();
count++;
}
return corruptCnt == count;
}, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
}
/**
* Blocks until {@link FSNamesystem#lazyPersistFileScrubber} daemon completes
* a full iteration.
*
* @throws InterruptedException if the thread receives an interrupt.
* @throws TimeoutException
* {@link FSNamesystem#getLazyPersistFileScrubberTS()}
* does not update the timestamp by
* {@link #WAIT_FOR_LAZY_SCRUBBER_MS}.
*/
protected void waitForScrubberCycle()
throws TimeoutException, InterruptedException {
// wait for the redundancy monitor to mark the file as corrupt.
final FSNamesystem fsn = cluster.getNamesystem();
final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS();
if (lastTimeStamp == -1) { // scrubber is disabled
return;
}
GenericTestUtils.waitFor(
() -> lastTimeStamp != fsn.getLazyPersistFileScrubberTS(),
2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS);
}
/**
* Blocks until {@link BlockManager#RedundancyMonitor} daemon completes
* a full iteration.
*
* @throws InterruptedException if the thread receives an interrupt.
* @throws TimeoutException {@link BlockManager#getLastRedundancyMonitorTS()}
* does not update the timestamp by
* {@link #WAIT_FOR_REDUNDANCY_MS}.
*/
protected void waitForRedundancyMonitorCycle()
throws TimeoutException, InterruptedException {
// wait for the redundancy monitor to mark the file as corrupt.
final BlockManager bm = cluster.getNamesystem().getBlockManager();
final long lastRedundancyTS =
bm.getLastRedundancyMonitorTS();
GenericTestUtils.waitFor(
() -> lastRedundancyTS != bm.getLastRedundancyMonitorTS(),
2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
}
/**
* Blocks until {@link BlockManager#lowRedundancyBlocksCount} reaches a
* certain value.
*
* @throws InterruptedException if the thread receives an interrupt.
* @throws TimeoutException {@link BlockManager#getLowRedundancyBlocksCount()}
* does not update the count by
* {@link #WAIT_FOR_REDUNDANCY_MS}.
*/
protected void waitForLowRedundancyCount(final long cnt)
throws TimeoutException, InterruptedException {
final BlockManager bm = cluster.getNamesystem().getBlockManager();
GenericTestUtils.waitFor(() -> cnt == bm.getLowRedundancyBlocksCount(),
2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
}
/**
* Blocks until the file status changes on the filesystem.
*
* @param path of the file to be checked.
* @param expected whether a file should exist or not.
* @throws TimeoutException if the file status does not meet the expected by
* {@link #WAIT_FOR_STORAGE_TYPES_MS}.
* @throws InterruptedException if the thread receives an interrupt.
*/
protected void waitForFile(final Path path, final boolean expected)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
try {
return expected == fs.exists(path);
} catch (IOException e) {
return false;
}
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
}
} }

View File

@ -16,10 +16,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ThreadUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -32,6 +33,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.fs.StorageType.RAM_DISK; import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
@ -96,20 +98,28 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
makeTestFile(path1, BLOCK_SIZE, true); makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Stop the DataNode. // Stop the DataNode and sleep for the time it takes the NN to
shutdownDataNodes(); // detect the DN as being dead.
cluster.shutdownDataNodes();
Thread.sleep(30000L);
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1)); assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
// Next, wait for the redundancy monitor to mark the file as corrupt. // Next, wait for the redundancy monitor to mark the file as corrupt.
waitForRedundancyMonitorCycle(); Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
// Wait for the LazyPersistFileScrubber to run // Wait for the LazyPersistFileScrubber to run
waitForScrubberCycle(); Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
// Ensure that path1 does not exist anymore, whereas path2 does. // Ensure that path1 does not exist anymore, whereas path2 does.
waitForFile(path1, false); assert(!fs.exists(path1));
// We should have zero blocks that needs replication i.e. the one // We should have zero blocks that needs replication i.e. the one
// belonging to path2. This needs a wait. // belonging to path2.
waitForLowRedundancyCount(0L); assertThat(cluster.getNameNode()
.getNamesystem()
.getBlockManager()
.getLowRedundancyBlocksCount(),
is(0L));
} }
@Test @Test
@ -124,14 +134,18 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
// Stop the DataNode and sleep for the time it takes the NN to // Stop the DataNode and sleep for the time it takes the NN to
// detect the DN as being dead. // detect the DN as being dead.
shutdownDataNodes(); cluster.shutdownDataNodes();
Thread.sleep(30000L);
// Next, wait for the redundancy monitor to mark the file as corrupt.
Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
// wait for the redundancy monitor to mark the file as corrupt.
waitForCorruptBlock(1L);
// Wait for the LazyPersistFileScrubber to run // Wait for the LazyPersistFileScrubber to run
waitForScrubberCycle(); Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
// Ensure that path1 exist. // Ensure that path1 exist.
waitForFile(path1, true); Assert.assertTrue(fs.exists(path1));
} }
/** /**
@ -146,14 +160,20 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
makeTestFile(path1, BLOCK_SIZE, true); makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
shutdownDataNodes(); cluster.shutdownDataNodes();
cluster.restartNameNodes(); cluster.restartNameNodes();
// wait for the redundancy monitor to mark the file as corrupt. // wait for the redundancy monitor to mark the file as corrupt.
waitForCorruptBlock(1L); Long corruptBlkCount;
do {
Thread.sleep(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
corruptBlkCount = (long) Iterators.size(cluster.getNameNode()
.getNamesystem().getBlockManager().getCorruptReplicaBlockIterator());
} while (corruptBlkCount != 1L);
// Ensure path1 exist. // Ensure path1 exist.
waitForFile(path1, true); Assert.assertTrue(fs.exists(path1));
} }
/** /**
@ -195,8 +215,10 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
threads[i].start(); threads[i].start();
} }
Thread.sleep(500);
for (int i = 0; i < NUM_TASKS; i++) { for (int i = 0; i < NUM_TASKS; i++) {
ThreadUtil.joinUninterruptibly(threads[i]); Uninterruptibles.joinUninterruptibly(threads[i]);
} }
Assert.assertFalse(testFailed.get()); Assert.assertFalse(testFailed.get());
} }
@ -210,7 +232,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
*/ */
@Test @Test
public void testConcurrentWrites() public void testConcurrentWrites()
throws IOException, InterruptedException, TimeoutException { throws IOException, InterruptedException {
getClusterBuilder().setRamDiskReplicaCapacity(9).build(); getClusterBuilder().setRamDiskReplicaCapacity(9).build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED; final int SEED = 0xFADED;
@ -259,11 +281,11 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
this.seed = seed; this.seed = seed;
this.latch = latch; this.latch = latch;
this.bFail = bFail; this.bFail = bFail;
LOG.info("Creating Writer: {}", id); System.out.println("Creating Writer: " + id);
} }
public void run() { public void run() {
LOG.info("Writer {} starting... ", id); System.out.println("Writer " + id + " starting... ");
int i = 0; int i = 0;
try { try {
for (i = 0; i < paths.length; i++) { for (i = 0; i < paths.length; i++) {
@ -273,8 +295,9 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
} }
} catch (IOException e) { } catch (IOException e) {
bFail.set(true); bFail.set(true);
LOG.error("Writer exception: writer id:{} testfile: {}", LOG.error("Writer exception: writer id:" + id +
id, paths[i].toString(), e); " testfile: " + paths[i].toString() +
" " + e);
} finally { } finally {
latch.countDown(); latch.countDown();
} }

View File

@ -119,7 +119,7 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
*/ */
@Test @Test
public void testFallbackToDiskPartial() public void testFallbackToDiskPartial()
throws IOException, InterruptedException, TimeoutException { throws IOException, InterruptedException {
getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build(); getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -157,6 +156,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
for (int i = 0; i < NUM_PATHS; ++i) { for (int i = 0; i < NUM_PATHS; ++i) {
makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true); makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
triggerBlockReport(); triggerBlockReport();
Thread.sleep(3000);
ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK); ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT); ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
for (int j = i + 1; j < NUM_PATHS; ++j) { for (int j = i + 1; j < NUM_PATHS; ++j) {
@ -183,13 +183,13 @@ public class TestLazyWriter extends LazyPersistTestCase {
throws Exception { throws Exception {
getClusterBuilder().build(); getClusterBuilder().build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
final DataNode dn = cluster.getDataNodes().get(0); FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
FsDatasetTestUtil.stopLazyWriter(dn);
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true); makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks = LocatedBlocks locatedBlocks =
ensureFileReplicasOnStorageType(path, RAM_DISK); ensureFileReplicasOnStorageType(path, RAM_DISK);
// Delete before persist // Delete before persist
client.delete(path.toString(), false); client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path)); Assert.assertFalse(fs.exists(path));