HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed
by Ahmed Hussein.
(cherry picked from commit 27cfda708e
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
This commit is contained in:
parent
9ccefe2262
commit
b92477c638
|
@ -49,6 +49,34 @@ public class ThreadUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Join a thread as uninterruptible.
|
||||||
|
* The call continues to block until the result is available even when the
|
||||||
|
* caller thread is interrupted.
|
||||||
|
* The method will log any {@link InterruptedException} then will re-interrupt
|
||||||
|
* the thread.
|
||||||
|
*
|
||||||
|
* @param toJoin the thread to Join on.
|
||||||
|
*/
|
||||||
|
public static void joinUninterruptibly(Thread toJoin) {
|
||||||
|
boolean interrupted = false;
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
toJoin.join();
|
||||||
|
return;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
interrupted = true;
|
||||||
|
LOG.warn("interrupted while sleeping", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (interrupted) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convenience method that returns a resource as inputstream from the
|
* Convenience method that returns a resource as inputstream from the
|
||||||
* classpath.
|
* classpath.
|
||||||
|
|
|
@ -60,7 +60,6 @@ import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
@ -369,11 +368,15 @@ public abstract class GenericTestUtils {
|
||||||
* time
|
* time
|
||||||
* @throws InterruptedException if the method is interrupted while waiting
|
* @throws InterruptedException if the method is interrupted while waiting
|
||||||
*/
|
*/
|
||||||
public static void waitFor(Supplier<Boolean> check, int checkEveryMillis,
|
public static void waitFor(final Supplier<Boolean> check,
|
||||||
int waitForMillis) throws TimeoutException, InterruptedException {
|
final long checkEveryMillis, final long waitForMillis)
|
||||||
Preconditions.checkNotNull(check, ERROR_MISSING_ARGUMENT);
|
throws TimeoutException, InterruptedException {
|
||||||
Preconditions.checkArgument(waitForMillis >= checkEveryMillis,
|
if (check == null) {
|
||||||
ERROR_INVALID_ARGUMENT);
|
throw new NullPointerException(ERROR_MISSING_ARGUMENT);
|
||||||
|
}
|
||||||
|
if (waitForMillis < checkEveryMillis) {
|
||||||
|
throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
|
||||||
|
}
|
||||||
|
|
||||||
long st = Time.monotonicNow();
|
long st = Time.monotonicNow();
|
||||||
boolean result = check.get();
|
boolean result = check.get();
|
||||||
|
|
|
@ -48,6 +48,8 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.FutureTask;
|
import java.util.concurrent.FutureTask;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
@ -321,7 +323,12 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
|
|
||||||
/** Redundancy thread. */
|
/** Redundancy thread. */
|
||||||
private final Daemon redundancyThread = new Daemon(new RedundancyMonitor());
|
private final Daemon redundancyThread = new Daemon(new RedundancyMonitor());
|
||||||
|
/**
|
||||||
|
* Timestamp marking the end time of {@link #redundancyThread}'s full cycle.
|
||||||
|
* This value can be checked by the Junit tests to verify that the
|
||||||
|
* {@link #redundancyThread} has run at least one full iteration.
|
||||||
|
*/
|
||||||
|
private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
|
||||||
/** StorageInfoDefragmenter thread. */
|
/** StorageInfoDefragmenter thread. */
|
||||||
private final Daemon storageInfoDefragmenterThread =
|
private final Daemon storageInfoDefragmenterThread =
|
||||||
new Daemon(new StorageInfoDefragmenter());
|
new Daemon(new StorageInfoDefragmenter());
|
||||||
|
@ -4780,6 +4787,17 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
return neededReconstruction.size();
|
return neededReconstruction.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used as ad hoc to check the time stamp of the last full cycle of
|
||||||
|
* {@link #redundancyThread}. This is used by the Junit tests to block until
|
||||||
|
* {@link #lastRedundancyCycleTS} is updated.
|
||||||
|
* @return the current {@link #lastRedundancyCycleTS}.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getLastRedundancyMonitorTS() {
|
||||||
|
return lastRedundancyCycleTS.get();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Periodically calls computeBlockRecoveryWork().
|
* Periodically calls computeBlockRecoveryWork().
|
||||||
*/
|
*/
|
||||||
|
@ -4794,6 +4812,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
computeDatanodeWork();
|
computeDatanodeWork();
|
||||||
processPendingReconstructions();
|
processPendingReconstructions();
|
||||||
rescanPostponedMisreplicatedBlocks();
|
rescanPostponedMisreplicatedBlocks();
|
||||||
|
lastRedundancyCycleTS.set(Time.monotonicNow());
|
||||||
}
|
}
|
||||||
TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);
|
TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|
|
@ -89,6 +89,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
|
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
|
||||||
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
|
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
|
||||||
|
@ -113,7 +115,6 @@ import java.io.DataInput;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
@ -308,6 +309,7 @@ import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.apache.log4j.Appender;
|
import org.apache.log4j.Appender;
|
||||||
import org.apache.log4j.AsyncAppender;
|
import org.apache.log4j.AsyncAppender;
|
||||||
|
@ -486,7 +488,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
// A daemon to periodically clean up corrupt lazyPersist files
|
// A daemon to periodically clean up corrupt lazyPersist files
|
||||||
// from the name space.
|
// from the name space.
|
||||||
Daemon lazyPersistFileScrubber = null;
|
Daemon lazyPersistFileScrubber = null;
|
||||||
|
/**
|
||||||
|
* Timestamp marking the end time of {@link #lazyPersistFileScrubber}'s full
|
||||||
|
* cycle. This value can be checked by the Junit tests to verify that the
|
||||||
|
* {@link #lazyPersistFileScrubber} has run at least one full iteration.
|
||||||
|
*/
|
||||||
|
private final AtomicLong lazyPersistFileScrubberTS = new AtomicLong(0);
|
||||||
// Executor to warm up EDEK cache
|
// Executor to warm up EDEK cache
|
||||||
private ExecutorService edekCacheLoader = null;
|
private ExecutorService edekCacheLoader = null;
|
||||||
private final int edekCacheLoaderDelay;
|
private final int edekCacheLoaderDelay;
|
||||||
|
@ -645,6 +652,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return leaseManager;
|
return leaseManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used as ad hoc to check the time stamp of the last full cycle of {@link
|
||||||
|
* #lazyPersistFileScrubber} daemon. This is used by the Junit tests to block
|
||||||
|
* until {@link #lazyPersistFileScrubberTS} is updated.
|
||||||
|
*
|
||||||
|
* @return the current {@link #lazyPersistFileScrubberTS} if {@link
|
||||||
|
* #lazyPersistFileScrubber} is not null.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getLazyPersistFileScrubberTS() {
|
||||||
|
return lazyPersistFileScrubber == null ? -1
|
||||||
|
: lazyPersistFileScrubberTS.get();
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isHaEnabled() {
|
public boolean isHaEnabled() {
|
||||||
return haEnabled;
|
return haEnabled;
|
||||||
}
|
}
|
||||||
|
@ -4116,10 +4137,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
try {
|
try {
|
||||||
if (!isInSafeMode()) {
|
if (!isInSafeMode()) {
|
||||||
clearCorruptLazyPersistFiles();
|
clearCorruptLazyPersistFiles();
|
||||||
|
// set the timeStamp of last Cycle.
|
||||||
|
lazyPersistFileScrubberTS.set(Time.monotonicNow());
|
||||||
} else {
|
} else {
|
||||||
if (FSNamesystem.LOG.isDebugEnabled()) {
|
if (FSNamesystem.LOG.isDebugEnabled()) {
|
||||||
FSNamesystem.LOG
|
FSNamesystem.LOG.debug("Namenode is in safemode, skipping "
|
||||||
.debug("Namenode is in safemode, skipping scrubbing of corrupted lazy-persist files.");
|
+ "scrubbing of corrupted lazy-persist files.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -2000,6 +2000,15 @@ public class DFSTestUtil {
|
||||||
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
|
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void setNameNodeLogLevel(org.slf4j.event.Level level) {
|
||||||
|
GenericTestUtils.setLogLevel(FSNamesystem.LOG, level);
|
||||||
|
GenericTestUtils.setLogLevel(BlockManager.LOG, level);
|
||||||
|
GenericTestUtils.setLogLevel(LeaseManager.LOG, level);
|
||||||
|
GenericTestUtils.setLogLevel(NameNode.LOG, level);
|
||||||
|
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
|
||||||
|
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the NamenodeProtocol RPC proxy for the NN associated with this
|
* Get the NamenodeProtocol RPC proxy for the NN associated with this
|
||||||
* DFSClient object
|
* DFSClient object
|
||||||
|
@ -2282,15 +2291,12 @@ public class DFSTestUtil {
|
||||||
public Boolean get() {
|
public Boolean get() {
|
||||||
try {
|
try {
|
||||||
final int currentValue = Integer.parseInt(jmx.getValue(metricName));
|
final int currentValue = Integer.parseInt(jmx.getValue(metricName));
|
||||||
LOG.info("Waiting for " + metricName +
|
|
||||||
" to reach value " + expectedValue +
|
|
||||||
", current value = " + currentValue);
|
|
||||||
return currentValue == expectedValue;
|
return currentValue == expectedValue;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new UnhandledException("Test failed due to unexpected exception", e);
|
throw new UnhandledException("Test failed due to unexpected exception", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, 1000, 60000);
|
}, 50, 60000);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -17,7 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
|
@ -45,6 +49,14 @@ import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -68,10 +80,13 @@ import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.log4j.Level;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
import org.slf4j.event.Level;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
public abstract class LazyPersistTestCase {
|
public abstract class LazyPersistTestCase {
|
||||||
static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
|
static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
|
||||||
|
@ -81,16 +96,33 @@ public abstract class LazyPersistTestCase {
|
||||||
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
|
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(LazyPersistTestCase.class);
|
||||||
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
|
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
|
||||||
protected static final int BUFFER_LENGTH = 4096;
|
protected static final int BUFFER_LENGTH = 4096;
|
||||||
private static final long HEARTBEAT_INTERVAL_SEC = 1;
|
|
||||||
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
|
|
||||||
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
|
|
||||||
private static final String JMX_SERVICE_NAME = "DataNode";
|
|
||||||
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
|
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
|
||||||
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
|
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
|
||||||
protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
|
|
||||||
protected static final short REPL_FACTOR = 1;
|
protected static final short REPL_FACTOR = 1;
|
||||||
|
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
|
||||||
|
private static final String JMX_SERVICE_NAME = "DataNode";
|
||||||
|
private static final long HEARTBEAT_INTERVAL_SEC = 1;
|
||||||
|
private static final int HEARTBEAT_RECHECK_INTERVAL_MS = 500;
|
||||||
|
private static final long WAIT_FOR_FBR_MS =
|
||||||
|
TimeUnit.SECONDS.toMillis(10);
|
||||||
|
private static final long WAIT_FOR_STORAGE_TYPES_MS =
|
||||||
|
TimeUnit.SECONDS.toMillis(30);
|
||||||
|
private static final long WAIT_FOR_ASYNC_DELETE_MS =
|
||||||
|
TimeUnit.SECONDS.toMillis(10);
|
||||||
|
private static final long WAIT_FOR_DN_SHUTDOWN_MS =
|
||||||
|
TimeUnit.SECONDS.toMillis(30);
|
||||||
|
private static final long WAIT_FOR_REDUNDANCY_MS =
|
||||||
|
TimeUnit.SECONDS
|
||||||
|
.toMillis(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT);
|
||||||
|
private static final long WAIT_FOR_LAZY_SCRUBBER_MS =
|
||||||
|
TimeUnit.SECONDS.toMillis(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
|
||||||
|
private static final long WAIT_POLL_INTERVAL_MS = 10;
|
||||||
|
private static final long WAIT_POLL_INTERVAL_LARGE_MS = 20;
|
||||||
|
|
||||||
protected final long osPageSize =
|
protected final long osPageSize =
|
||||||
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
||||||
|
|
||||||
|
@ -133,13 +165,11 @@ public abstract class LazyPersistTestCase {
|
||||||
Path path, StorageType storageType)
|
Path path, StorageType storageType)
|
||||||
throws IOException, TimeoutException, InterruptedException {
|
throws IOException, TimeoutException, InterruptedException {
|
||||||
// Ensure that returned block locations returned are correct!
|
// Ensure that returned block locations returned are correct!
|
||||||
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
|
LOG.info("Ensure path: {} is on StorageType: {}", path, storageType);
|
||||||
assertThat(fs.exists(path), is(true));
|
assertThat(fs.exists(path), is(true));
|
||||||
long fileLength = client.getFileInfo(path.toString()).getLen();
|
long fileLength = client.getFileInfo(path.toString()).getLen();
|
||||||
|
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(() -> {
|
||||||
@Override
|
|
||||||
public Boolean get() {
|
|
||||||
try {
|
try {
|
||||||
LocatedBlocks locatedBlocks =
|
LocatedBlocks locatedBlocks =
|
||||||
client.getLocatedBlocks(path.toString(), 0, fileLength);
|
client.getLocatedBlocks(path.toString(), 0, fileLength);
|
||||||
|
@ -153,58 +183,63 @@ public abstract class LazyPersistTestCase {
|
||||||
LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe);
|
LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
|
||||||
}, 100, 30 * 1000);
|
|
||||||
|
|
||||||
return client.getLocatedBlocks(path.toString(), 0, fileLength);
|
return client.getLocatedBlocks(path.toString(), 0, fileLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make sure at least one non-transient volume has a saved copy of the replica.
|
* Make sure at least one non-transient volume has a saved copy of the
|
||||||
* An infinite loop is used to ensure the async lazy persist tasks are completely
|
* replica. An infinite loop is used to ensure the async lazy persist tasks
|
||||||
* done before verification. Caller of ensureLazyPersistBlocksAreSaved expects
|
* are completely done before verification.
|
||||||
* either a successful pass or timeout failure.
|
* Caller of this method expects either a successful pass or timeout failure.
|
||||||
|
*
|
||||||
|
* @param locatedBlocks the collection of blocks and their locations.
|
||||||
|
* @throws IOException for aut-closeable resources.
|
||||||
|
* @throws InterruptedException if the thread is interrupted.
|
||||||
|
* @throws TimeoutException if {@link #WAIT_FOR_STORAGE_TYPES_MS} expires
|
||||||
|
* before we find a persisted copy for each located
|
||||||
|
* block.
|
||||||
*/
|
*/
|
||||||
protected final void ensureLazyPersistBlocksAreSaved(
|
protected final void ensureLazyPersistBlocksAreSaved(
|
||||||
LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
|
final LocatedBlocks locatedBlocks)
|
||||||
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
|
||||||
final Set<Long> persistedBlockIds = new HashSet<Long>();
|
final Set<Long> persistedBlockIds = new HashSet<Long>();
|
||||||
|
// We should find a persisted copy for each located block.
|
||||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||||
cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
|
cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
|
||||||
while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks()
|
GenericTestUtils.waitFor(() -> {
|
||||||
.size()) {
|
|
||||||
// Take 1 second sleep before each verification iteration
|
|
||||||
Thread.sleep(1000);
|
|
||||||
|
|
||||||
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
||||||
for (FsVolumeSpi v : volumes) {
|
for (FsVolumeSpi v : volumes) {
|
||||||
if (v.isTransientStorage()) {
|
if (v.isTransientStorage()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
FsVolumeImpl volume = (FsVolumeImpl) v;
|
FsVolumeImpl volume = (FsVolumeImpl) v;
|
||||||
File lazyPersistDir =
|
File lazyPersistDir;
|
||||||
|
try {
|
||||||
|
lazyPersistDir =
|
||||||
volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
long blockId = lb.getBlock().getBlockId();
|
long blockId = lb.getBlock().getBlockId();
|
||||||
File targetDir =
|
File targetDir =
|
||||||
DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
|
DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
|
||||||
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
||||||
if (blockFile.exists()) {
|
if (blockFile.exists()) {
|
||||||
// Found a persisted copy for this block and added to the Set
|
// Found a persisted copy for this block and added to the Set.
|
||||||
persistedBlockIds.add(blockId);
|
persistedBlockIds.add(blockId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return (persistedBlockIds.size() ==
|
||||||
|
locatedBlocks.getLocatedBlocks().size());
|
||||||
|
}, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_STORAGE_TYPES_MS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We should have found a persisted copy for each located block.
|
|
||||||
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final void makeRandomTestFile(Path path, long length,
|
protected final void makeRandomTestFile(Path path, long length,
|
||||||
boolean isLazyPersist, long seed) throws IOException {
|
boolean isLazyPersist, long seed) throws IOException {
|
||||||
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
|
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
|
||||||
|
@ -271,7 +306,7 @@ public abstract class LazyPersistTestCase {
|
||||||
}
|
}
|
||||||
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
|
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
|
||||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||||
HEARTBEAT_RECHECK_INTERVAL_MSEC);
|
HEARTBEAT_RECHECK_INTERVAL_MS);
|
||||||
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
||||||
LAZY_WRITER_INTERVAL_SEC);
|
LAZY_WRITER_INTERVAL_SEC);
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
|
||||||
|
@ -334,18 +369,18 @@ public abstract class LazyPersistTestCase {
|
||||||
@Override
|
@Override
|
||||||
public void mlock(String identifier,
|
public void mlock(String identifier,
|
||||||
ByteBuffer mmap, long length) throws IOException {
|
ByteBuffer mmap, long length) throws IOException {
|
||||||
LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes.");
|
LOG.info("LazyPersistTestCase: faking mlock of {} bytes.", identifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getMemlockLimit() {
|
public long getMemlockLimit() {
|
||||||
LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE);
|
LOG.info("LazyPersistTestCase: fake return {}", Long.MAX_VALUE);
|
||||||
return Long.MAX_VALUE;
|
return Long.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean verifyCanMlock() {
|
public boolean verifyCanMlock() {
|
||||||
LOG.info("LazyPersistTestCase: fake return " + true);
|
LOG.info("LazyPersistTestCase: fake return {}", true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -413,8 +448,10 @@ public abstract class LazyPersistTestCase {
|
||||||
|
|
||||||
public void build() throws IOException {
|
public void build() throws IOException {
|
||||||
LazyPersistTestCase.this.startUpCluster(
|
LazyPersistTestCase.this.startUpCluster(
|
||||||
numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
|
numDatanodes, hasTransientStorage, storageTypes,
|
||||||
ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal,
|
ramDiskReplicaCapacity,
|
||||||
|
ramDiskStorageLimit, maxLockedMemory, useScr,
|
||||||
|
useLegacyBlockReaderLocal,
|
||||||
disableScrubber);
|
disableScrubber);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -429,11 +466,44 @@ public abstract class LazyPersistTestCase {
|
||||||
private boolean disableScrubber=false;
|
private boolean disableScrubber=false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forces a full blockreport on all the datatanodes. The call blocks waiting
|
||||||
|
* for all blockreports to be received by the namenode.
|
||||||
|
*
|
||||||
|
* @throws IOException if an exception is thrown while getting the datanode
|
||||||
|
* descriptors or triggering the blockreports.
|
||||||
|
* @throws InterruptedException if the thread receives an interrupt.
|
||||||
|
* @throws TimeoutException if the reports are not received by
|
||||||
|
* {@link #WAIT_FOR_FBR_MS}.
|
||||||
|
*/
|
||||||
protected final void triggerBlockReport()
|
protected final void triggerBlockReport()
|
||||||
throws IOException, InterruptedException {
|
throws InterruptedException, TimeoutException, IOException {
|
||||||
// Trigger block report to NN
|
// Trigger block report to NN
|
||||||
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
|
final Map<DatanodeStorageInfo, Integer> reportCountsBefore =
|
||||||
Thread.sleep(10 * 1000);
|
new HashMap<>();
|
||||||
|
final FSNamesystem fsn = cluster.getNamesystem();
|
||||||
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
|
final DatanodeDescriptor dnd =
|
||||||
|
NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
|
||||||
|
final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
|
||||||
|
reportCountsBefore.put(storage, storage.getBlockReportCount());
|
||||||
|
DataNodeTestUtils.triggerBlockReport(dn);
|
||||||
|
}
|
||||||
|
// wait for block reports to be received.
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
for (Entry<DatanodeStorageInfo, Integer> reportEntry :
|
||||||
|
reportCountsBefore.entrySet()) {
|
||||||
|
final DatanodeStorageInfo dnStorageInfo = reportEntry.getKey();
|
||||||
|
final int cntBefore = reportEntry.getValue();
|
||||||
|
final int currentCnt = dnStorageInfo.getBlockReportCount();
|
||||||
|
if (cntBefore == currentCnt) {
|
||||||
|
// Same count means no report has been received.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If we reach here, then all the block reports have been received.
|
||||||
|
return true;
|
||||||
|
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final boolean verifyBlockDeletedFromDir(File dir,
|
protected final boolean verifyBlockDeletedFromDir(File dir,
|
||||||
|
@ -445,51 +515,58 @@ public abstract class LazyPersistTestCase {
|
||||||
|
|
||||||
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
||||||
if (blockFile.exists()) {
|
if (blockFile.exists()) {
|
||||||
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
|
|
||||||
" exists after deletion.");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
File metaFile = new File(targetDir,
|
File metaFile = new File(targetDir,
|
||||||
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
|
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
|
||||||
lb.getBlock().getGenerationStamp()));
|
lb.getBlock().getGenerationStamp()));
|
||||||
if (metaFile.exists()) {
|
if (metaFile.exists()) {
|
||||||
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
|
|
||||||
" exists after deletion.");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
|
protected final boolean verifyDeletedBlocks(final LocatedBlocks locatedBlocks)
|
||||||
throws IOException, InterruptedException {
|
throws Exception {
|
||||||
|
|
||||||
LOG.info("Verifying replica has no saved copy after deletion.");
|
LOG.info("Verifying replica has no saved copy after deletion.");
|
||||||
triggerBlockReport();
|
triggerBlockReport();
|
||||||
|
final DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
|
||||||
while(
|
GenericTestUtils.waitFor(() -> {
|
||||||
cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions()
|
for (DataNode dn1 : cluster.getDataNodes()) {
|
||||||
> 0L){
|
if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions()
|
||||||
Thread.sleep(1000);
|
> 0) {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
|
||||||
|
|
||||||
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
final FsDatasetSpi<?> dataset =
|
final FsDatasetSpi<?> dataset = dn.getFSDataset();
|
||||||
cluster.getDataNodes().get(0).getFSDataset();
|
|
||||||
|
|
||||||
// Make sure deleted replica does not have a copy on either finalized dir of
|
// Make sure deleted replica does not have a copy on either finalized dir of
|
||||||
// transient volume or finalized dir of non-transient volume
|
// transient volume or finalized dir of non-transient volume.
|
||||||
|
// We need to wait until the asyn deletion is scheduled.
|
||||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||||
dataset.getFsVolumeReferences()) {
|
dataset.getFsVolumeReferences()) {
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
for (FsVolumeSpi vol : volumes) {
|
for (FsVolumeSpi vol : volumes) {
|
||||||
FsVolumeImpl volume = (FsVolumeImpl) vol;
|
FsVolumeImpl volume = (FsVolumeImpl) vol;
|
||||||
File targetDir = (volume.isTransientStorage()) ?
|
File targetDir = (volume.isTransientStorage()) ?
|
||||||
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
|
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
|
||||||
volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
||||||
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
|
if (!verifyBlockDeletedFromDir(targetDir, locatedBlocks)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
|
} catch (IOException ie) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -530,8 +607,137 @@ public abstract class LazyPersistTestCase {
|
||||||
DFSTestUtil.waitForMetric(jmx, metricName, expectedValue);
|
DFSTestUtil.waitForMetric(jmx, metricName, expectedValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void triggerEviction(DataNode dn) {
|
protected void triggerEviction(final DataNode dn) {
|
||||||
FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
|
FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
|
||||||
fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
|
fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown all datanodes in {@link #cluster}. The call blocks for
|
||||||
|
* {@link #WAIT_FOR_DN_SHUTDOWN_MS} until client report has no datanode
|
||||||
|
* labeled as live.
|
||||||
|
*
|
||||||
|
* @throws TimeoutException if {@link #WAIT_FOR_DN_SHUTDOWN_MS} expires with
|
||||||
|
* at least one datanode still alive.
|
||||||
|
* @throws InterruptedException if the thread receives an interrupt.
|
||||||
|
*/
|
||||||
|
protected void shutdownDataNodes()
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
cluster.shutdownDataNodes();
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
DatanodeInfo[] info = client.datanodeReport(
|
||||||
|
HdfsConstants.DatanodeReportType.LIVE);
|
||||||
|
return info.length == 0;
|
||||||
|
} catch (IOException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks for {@link #WAIT_FOR_REDUNDANCY_MS} waiting for corrupt block count
|
||||||
|
* to reach a certain count.
|
||||||
|
*
|
||||||
|
* @param corruptCnt representing the number of corrupt blocks before
|
||||||
|
* resuming.
|
||||||
|
* @throws TimeoutException if {@link #WAIT_FOR_REDUNDANCY_MS} expires with
|
||||||
|
* corrupt count does not meet the criteria.
|
||||||
|
* @throws InterruptedException if the thread receives an interrupt.
|
||||||
|
*/
|
||||||
|
protected void waitForCorruptBlock(final long corruptCnt)
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
// wait for the redundancy monitor to mark the file as corrupt.
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
Iterator<BlockInfo> bInfoIter = cluster.getNameNode()
|
||||||
|
.getNamesystem().getBlockManager().getCorruptReplicaBlockIterator();
|
||||||
|
int count = 0;
|
||||||
|
while (bInfoIter.hasNext()) {
|
||||||
|
bInfoIter.next();
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
return corruptCnt == count;
|
||||||
|
}, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks until {@link FSNamesystem#lazyPersistFileScrubber} daemon completes
|
||||||
|
* a full iteration.
|
||||||
|
*
|
||||||
|
* @throws InterruptedException if the thread receives an interrupt.
|
||||||
|
* @throws TimeoutException
|
||||||
|
* {@link FSNamesystem#getLazyPersistFileScrubberTS()}
|
||||||
|
* does not update the timestamp by
|
||||||
|
* {@link #WAIT_FOR_LAZY_SCRUBBER_MS}.
|
||||||
|
*/
|
||||||
|
protected void waitForScrubberCycle()
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
// wait for the redundancy monitor to mark the file as corrupt.
|
||||||
|
final FSNamesystem fsn = cluster.getNamesystem();
|
||||||
|
final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS();
|
||||||
|
if (lastTimeStamp == -1) { // scrubber is disabled
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> lastTimeStamp != fsn.getLazyPersistFileScrubberTS(),
|
||||||
|
2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks until {@link BlockManager#RedundancyMonitor} daemon completes
|
||||||
|
* a full iteration.
|
||||||
|
*
|
||||||
|
* @throws InterruptedException if the thread receives an interrupt.
|
||||||
|
* @throws TimeoutException {@link BlockManager#getLastRedundancyMonitorTS()}
|
||||||
|
* does not update the timestamp by
|
||||||
|
* {@link #WAIT_FOR_REDUNDANCY_MS}.
|
||||||
|
*/
|
||||||
|
protected void waitForRedundancyMonitorCycle()
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
// wait for the redundancy monitor to mark the file as corrupt.
|
||||||
|
final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||||
|
final long lastRedundancyTS =
|
||||||
|
bm.getLastRedundancyMonitorTS();
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> lastRedundancyTS != bm.getLastRedundancyMonitorTS(),
|
||||||
|
2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks until {@link BlockManager#lowRedundancyBlocksCount} reaches a
|
||||||
|
* certain value.
|
||||||
|
*
|
||||||
|
* @throws InterruptedException if the thread receives an interrupt.
|
||||||
|
* @throws TimeoutException {@link BlockManager#getLowRedundancyBlocksCount()}
|
||||||
|
* does not update the count by
|
||||||
|
* {@link #WAIT_FOR_REDUNDANCY_MS}.
|
||||||
|
*/
|
||||||
|
protected void waitForLowRedundancyCount(final long cnt)
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(() -> cnt == bm.getLowRedundancyBlocksCount(),
|
||||||
|
2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks until the file status changes on the filesystem.
|
||||||
|
*
|
||||||
|
* @param path of the file to be checked.
|
||||||
|
* @param expected whether a file should exist or not.
|
||||||
|
* @throws TimeoutException if the file status does not meet the expected by
|
||||||
|
* {@link #WAIT_FOR_STORAGE_TYPES_MS}.
|
||||||
|
* @throws InterruptedException if the thread receives an interrupt.
|
||||||
|
*/
|
||||||
|
protected void waitForFile(final Path path, final boolean expected)
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
return expected == fs.exists(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,11 +16,10 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
import com.google.common.collect.Iterators;
|
|
||||||
import com.google.common.util.concurrent.Uninterruptibles;
|
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.ThreadUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -33,7 +32,6 @@ import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
|
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
|
||||||
|
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
@ -76,7 +74,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
makeTestFile(path, BLOCK_SIZE, true);
|
makeTestFile(path, BLOCK_SIZE, true);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
client.truncate(path.toString(), BLOCK_SIZE/2);
|
client.truncate(path.toString(), BLOCK_SIZE / 2);
|
||||||
fail("Truncate to LazyPersist file did not fail as expected");
|
fail("Truncate to LazyPersist file did not fail as expected");
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.info("Got expected exception ", t);
|
LOG.info("Got expected exception ", t);
|
||||||
|
@ -98,28 +96,20 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
makeTestFile(path1, BLOCK_SIZE, true);
|
makeTestFile(path1, BLOCK_SIZE, true);
|
||||||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
|
||||||
// Stop the DataNode and sleep for the time it takes the NN to
|
// Stop the DataNode.
|
||||||
// detect the DN as being dead.
|
shutdownDataNodes();
|
||||||
cluster.shutdownDataNodes();
|
|
||||||
Thread.sleep(30000L);
|
|
||||||
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
|
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
|
||||||
|
|
||||||
// Next, wait for the redundancy monitor to mark the file as corrupt.
|
// Next, wait for the redundancy monitor to mark the file as corrupt.
|
||||||
Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
|
waitForRedundancyMonitorCycle();
|
||||||
|
|
||||||
// Wait for the LazyPersistFileScrubber to run
|
// Wait for the LazyPersistFileScrubber to run
|
||||||
Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
|
waitForScrubberCycle();
|
||||||
|
|
||||||
// Ensure that path1 does not exist anymore, whereas path2 does.
|
// Ensure that path1 does not exist anymore, whereas path2 does.
|
||||||
assert(!fs.exists(path1));
|
waitForFile(path1, false);
|
||||||
|
|
||||||
// We should have zero blocks that needs replication i.e. the one
|
// We should have zero blocks that needs replication i.e. the one
|
||||||
// belonging to path2.
|
// belonging to path2. This needs a wait.
|
||||||
assertThat(cluster.getNameNode()
|
waitForLowRedundancyCount(0L);
|
||||||
.getNamesystem()
|
|
||||||
.getBlockManager()
|
|
||||||
.getLowRedundancyBlocksCount(),
|
|
||||||
is(0L));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -134,18 +124,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
|
|
||||||
// Stop the DataNode and sleep for the time it takes the NN to
|
// Stop the DataNode and sleep for the time it takes the NN to
|
||||||
// detect the DN as being dead.
|
// detect the DN as being dead.
|
||||||
cluster.shutdownDataNodes();
|
shutdownDataNodes();
|
||||||
Thread.sleep(30000L);
|
|
||||||
|
|
||||||
// Next, wait for the redundancy monitor to mark the file as corrupt.
|
|
||||||
Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
|
|
||||||
|
|
||||||
|
// wait for the redundancy monitor to mark the file as corrupt.
|
||||||
|
waitForCorruptBlock(1L);
|
||||||
// Wait for the LazyPersistFileScrubber to run
|
// Wait for the LazyPersistFileScrubber to run
|
||||||
Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
|
waitForScrubberCycle();
|
||||||
|
|
||||||
// Ensure that path1 exist.
|
// Ensure that path1 exist.
|
||||||
Assert.assertTrue(fs.exists(path1));
|
waitForFile(path1, true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -160,20 +146,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
makeTestFile(path1, BLOCK_SIZE, true);
|
makeTestFile(path1, BLOCK_SIZE, true);
|
||||||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
|
||||||
cluster.shutdownDataNodes();
|
shutdownDataNodes();
|
||||||
|
|
||||||
cluster.restartNameNodes();
|
cluster.restartNameNodes();
|
||||||
|
|
||||||
// wait for the redundancy monitor to mark the file as corrupt.
|
// wait for the redundancy monitor to mark the file as corrupt.
|
||||||
Long corruptBlkCount;
|
waitForCorruptBlock(1L);
|
||||||
do {
|
|
||||||
Thread.sleep(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
|
|
||||||
corruptBlkCount = (long) Iterators.size(cluster.getNameNode()
|
|
||||||
.getNamesystem().getBlockManager().getCorruptReplicaBlockIterator());
|
|
||||||
} while (corruptBlkCount != 1L);
|
|
||||||
|
|
||||||
// Ensure path1 exist.
|
// Ensure path1 exist.
|
||||||
Assert.assertTrue(fs.exists(path1));
|
waitForFile(path1, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -215,10 +195,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
threads[i].start();
|
threads[i].start();
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.sleep(500);
|
|
||||||
|
|
||||||
for (int i = 0; i < NUM_TASKS; i++) {
|
for (int i = 0; i < NUM_TASKS; i++) {
|
||||||
Uninterruptibles.joinUninterruptibly(threads[i]);
|
ThreadUtil.joinUninterruptibly(threads[i]);
|
||||||
}
|
}
|
||||||
Assert.assertFalse(testFailed.get());
|
Assert.assertFalse(testFailed.get());
|
||||||
}
|
}
|
||||||
|
@ -232,7 +210,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testConcurrentWrites()
|
public void testConcurrentWrites()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
getClusterBuilder().setRamDiskReplicaCapacity(9).build();
|
getClusterBuilder().setRamDiskReplicaCapacity(9).build();
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
final int SEED = 0xFADED;
|
final int SEED = 0xFADED;
|
||||||
|
@ -281,11 +259,11 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
this.seed = seed;
|
this.seed = seed;
|
||||||
this.latch = latch;
|
this.latch = latch;
|
||||||
this.bFail = bFail;
|
this.bFail = bFail;
|
||||||
System.out.println("Creating Writer: " + id);
|
LOG.info("Creating Writer: {}", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
System.out.println("Writer " + id + " starting... ");
|
LOG.info("Writer {} starting... ", id);
|
||||||
int i = 0;
|
int i = 0;
|
||||||
try {
|
try {
|
||||||
for (i = 0; i < paths.length; i++) {
|
for (i = 0; i < paths.length; i++) {
|
||||||
|
@ -295,9 +273,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
bFail.set(true);
|
bFail.set(true);
|
||||||
LOG.error("Writer exception: writer id:" + id +
|
LOG.error("Writer exception: writer id:{} testfile: {}",
|
||||||
" testfile: " + paths[i].toString() +
|
id, paths[i].toString(), e);
|
||||||
" " + e);
|
|
||||||
} finally {
|
} finally {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,7 +119,7 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testFallbackToDiskPartial()
|
public void testFallbackToDiskPartial()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
|
getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path path = new Path("/" + METHOD_NAME + ".dat");
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -156,7 +157,6 @@ public class TestLazyWriter extends LazyPersistTestCase {
|
||||||
for (int i = 0; i < NUM_PATHS; ++i) {
|
for (int i = 0; i < NUM_PATHS; ++i) {
|
||||||
makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
|
makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
|
||||||
triggerBlockReport();
|
triggerBlockReport();
|
||||||
Thread.sleep(3000);
|
|
||||||
ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
|
ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
|
||||||
ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
|
ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
|
||||||
for (int j = i + 1; j < NUM_PATHS; ++j) {
|
for (int j = i + 1; j < NUM_PATHS; ++j) {
|
||||||
|
@ -183,13 +183,13 @@ public class TestLazyWriter extends LazyPersistTestCase {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
getClusterBuilder().build();
|
getClusterBuilder().build();
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
final DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
FsDatasetTestUtil.stopLazyWriter(dn);
|
||||||
|
|
||||||
Path path = new Path("/" + METHOD_NAME + ".dat");
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||||
makeTestFile(path, BLOCK_SIZE, true);
|
makeTestFile(path, BLOCK_SIZE, true);
|
||||||
LocatedBlocks locatedBlocks =
|
LocatedBlocks locatedBlocks =
|
||||||
ensureFileReplicasOnStorageType(path, RAM_DISK);
|
ensureFileReplicasOnStorageType(path, RAM_DISK);
|
||||||
|
|
||||||
// Delete before persist
|
// Delete before persist
|
||||||
client.delete(path.toString(), false);
|
client.delete(path.toString(), false);
|
||||||
Assert.assertFalse(fs.exists(path));
|
Assert.assertFalse(fs.exists(path));
|
||||||
|
|
Loading…
Reference in New Issue