HDFS-7738. Revise the exception message for recover lease; add more truncate tests such as truncate with HA setup, negative tests, truncate with other operations and multiple truncates.
This commit is contained in:
parent
cfb829ecd5
commit
8f7d4bb09f
|
@ -584,6 +584,10 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-7710. Remove dead code in BackupImage.java. (Xiaoyu Yao via aajisaka)
|
HDFS-7710. Remove dead code in BackupImage.java. (Xiaoyu Yao via aajisaka)
|
||||||
|
|
||||||
|
HDFS-7738. Revise the exception message for recover lease; add more truncate
|
||||||
|
tests such as truncate with HA setup, negative tests, truncate with other
|
||||||
|
operations and multiple truncates. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||||
|
|
|
@ -227,7 +227,6 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
|
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
|
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
|
||||||
|
@ -1966,8 +1965,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"Cannot truncate lazy persist file " + src);
|
"Cannot truncate lazy persist file " + src);
|
||||||
}
|
}
|
||||||
// Opening an existing file for write. May need lease recovery.
|
// Opening an existing file for truncate. May need lease recovery.
|
||||||
recoverLeaseInternal(iip, src, clientName, clientMachine, false);
|
recoverLeaseInternal(RecoverLeaseOp.TRUNCATE_FILE,
|
||||||
|
iip, src, clientName, clientMachine, false);
|
||||||
// Truncate length check.
|
// Truncate length check.
|
||||||
long oldLength = file.computeFileSize();
|
long oldLength = file.computeFileSize();
|
||||||
if(oldLength == newLength) {
|
if(oldLength == newLength) {
|
||||||
|
@ -2498,7 +2498,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// If lease soft limit time is expired, recover the lease
|
// If lease soft limit time is expired, recover the lease
|
||||||
recoverLeaseInternal(iip, src, holder, clientMachine, false);
|
recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE,
|
||||||
|
iip, src, holder, clientMachine, false);
|
||||||
throw new FileAlreadyExistsException(src + " for client " +
|
throw new FileAlreadyExistsException(src + " for client " +
|
||||||
clientMachine + " already exists");
|
clientMachine + " already exists");
|
||||||
}
|
}
|
||||||
|
@ -2620,8 +2621,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"Cannot append to lazy persist file " + src);
|
"Cannot append to lazy persist file " + src);
|
||||||
}
|
}
|
||||||
// Opening an existing file for write - may need to recover lease.
|
// Opening an existing file for append - may need to recover lease.
|
||||||
recoverLeaseInternal(iip, src, holder, clientMachine, false);
|
recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE,
|
||||||
|
iip, src, holder, clientMachine, false);
|
||||||
|
|
||||||
final BlockInfo lastBlock = myFile.getLastBlock();
|
final BlockInfo lastBlock = myFile.getLastBlock();
|
||||||
// Check that the block has at least minimum replication.
|
// Check that the block has at least minimum replication.
|
||||||
|
@ -2720,7 +2722,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
dir.checkPathAccess(pc, iip, FsAction.WRITE);
|
dir.checkPathAccess(pc, iip, FsAction.WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
recoverLeaseInternal(iip, src, holder, clientMachine, true);
|
recoverLeaseInternal(RecoverLeaseOp.RECOVER_LEASE,
|
||||||
|
iip, src, holder, clientMachine, true);
|
||||||
} catch (StandbyException se) {
|
} catch (StandbyException se) {
|
||||||
skipSync = true;
|
skipSync = true;
|
||||||
throw se;
|
throw se;
|
||||||
|
@ -2735,7 +2738,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void recoverLeaseInternal(INodesInPath iip,
|
private enum RecoverLeaseOp {
|
||||||
|
CREATE_FILE,
|
||||||
|
APPEND_FILE,
|
||||||
|
TRUNCATE_FILE,
|
||||||
|
RECOVER_LEASE;
|
||||||
|
|
||||||
|
private String getExceptionMessage(String src, String holder,
|
||||||
|
String clientMachine, String reason) {
|
||||||
|
return "Failed to " + this + " " + src + " for " + holder +
|
||||||
|
" on " + clientMachine + " because " + reason;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void recoverLeaseInternal(RecoverLeaseOp op, INodesInPath iip,
|
||||||
String src, String holder, String clientMachine, boolean force)
|
String src, String holder, String clientMachine, boolean force)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
|
@ -2746,18 +2762,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
// leases. Find the appropriate lease record.
|
// leases. Find the appropriate lease record.
|
||||||
//
|
//
|
||||||
Lease lease = leaseManager.getLease(holder);
|
Lease lease = leaseManager.getLease(holder);
|
||||||
//
|
|
||||||
// We found the lease for this file. And surprisingly the original
|
|
||||||
// holder is trying to recreate this file. This should never occur.
|
|
||||||
//
|
|
||||||
|
|
||||||
if (!force && lease != null) {
|
if (!force && lease != null) {
|
||||||
Lease leaseFile = leaseManager.getLeaseByPath(src);
|
Lease leaseFile = leaseManager.getLeaseByPath(src);
|
||||||
if (leaseFile != null && leaseFile.equals(lease)) {
|
if (leaseFile != null && leaseFile.equals(lease)) {
|
||||||
|
// We found the lease for this file but the original
|
||||||
|
// holder is trying to obtain it again.
|
||||||
throw new AlreadyBeingCreatedException(
|
throw new AlreadyBeingCreatedException(
|
||||||
"failed to create file " + src + " for " + holder +
|
op.getExceptionMessage(src, holder, clientMachine,
|
||||||
" for client " + clientMachine +
|
holder + " is already the current lease holder."));
|
||||||
" because current leaseholder is trying to recreate file.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//
|
//
|
||||||
|
@ -2768,9 +2781,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
lease = leaseManager.getLease(clientName);
|
lease = leaseManager.getLease(clientName);
|
||||||
if (lease == null) {
|
if (lease == null) {
|
||||||
throw new AlreadyBeingCreatedException(
|
throw new AlreadyBeingCreatedException(
|
||||||
"failed to create file " + src + " for " + holder +
|
op.getExceptionMessage(src, holder, clientMachine,
|
||||||
" for client " + clientMachine +
|
"the file is under construction but no leases found."));
|
||||||
" because pendingCreates is non-null but no leases found.");
|
|
||||||
}
|
}
|
||||||
if (force) {
|
if (force) {
|
||||||
// close now: no need to wait for soft lease expiration and
|
// close now: no need to wait for soft lease expiration and
|
||||||
|
@ -2792,20 +2804,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
boolean isClosed = internalReleaseLease(lease, src, iip, null);
|
boolean isClosed = internalReleaseLease(lease, src, iip, null);
|
||||||
if(!isClosed)
|
if(!isClosed)
|
||||||
throw new RecoveryInProgressException(
|
throw new RecoveryInProgressException(
|
||||||
"Failed to close file " + src +
|
op.getExceptionMessage(src, holder, clientMachine,
|
||||||
". Lease recovery is in progress. Try again later.");
|
"lease recovery is in progress. Try again later."));
|
||||||
} else {
|
} else {
|
||||||
final BlockInfo lastBlock = file.getLastBlock();
|
final BlockInfo lastBlock = file.getLastBlock();
|
||||||
if (lastBlock != null
|
if (lastBlock != null
|
||||||
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
|
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
|
||||||
throw new RecoveryInProgressException("Recovery in progress, file ["
|
throw new RecoveryInProgressException(
|
||||||
+ src + "], " + "lease owner [" + lease.getHolder() + "]");
|
op.getExceptionMessage(src, holder, clientMachine,
|
||||||
|
"another recovery is in progress by "
|
||||||
|
+ clientName + " on " + uc.getClientMachine()));
|
||||||
} else {
|
} else {
|
||||||
throw new AlreadyBeingCreatedException("Failed to create file ["
|
throw new AlreadyBeingCreatedException(
|
||||||
+ src + "] for [" + holder + "] for client [" + clientMachine
|
op.getExceptionMessage(src, holder, clientMachine,
|
||||||
+ "], because this file is already being created by ["
|
"this file lease is currently owned by "
|
||||||
+ clientName + "] on ["
|
+ clientName + " on " + uc.getClientMachine()));
|
||||||
+ uc.getClientMachine() + "]");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,11 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -80,6 +82,27 @@ public class AppendTestUtil {
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @return a random file partition of length n. */
|
||||||
|
public static int[] randomFilePartition(int n, int parts) {
|
||||||
|
int[] p = new int[parts];
|
||||||
|
for(int i = 0; i < p.length; i++) {
|
||||||
|
p[i] = nextInt(n - i - 1) + 1;
|
||||||
|
}
|
||||||
|
Arrays.sort(p);
|
||||||
|
for(int i = 1; i < p.length; i++) {
|
||||||
|
if (p[i] <= p[i - 1]) {
|
||||||
|
p[i] = p[i - 1] + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("partition=" + Arrays.toString(p));
|
||||||
|
assertTrue("i=0", p[0] > 0 && p[0] < n);
|
||||||
|
for(int i = 1; i < p.length; i++) {
|
||||||
|
assertTrue("i=" + i, p[i] > p[i - 1] && p[i] < n);
|
||||||
|
}
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
static void sleep(long ms) {
|
static void sleep(long ms) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(ms);
|
Thread.sleep(ms);
|
||||||
|
@ -173,6 +196,11 @@ public class AppendTestUtil {
|
||||||
(short) repl, BLOCK_SIZE);
|
(short) repl, BLOCK_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void checkFullFile(FileSystem fs, Path file, int len,
|
||||||
|
final byte[] compareContent) throws IOException {
|
||||||
|
checkFullFile(fs, file, len, compareContent, file.toString());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compare the content of a file created from FileSystem and Path with
|
* Compare the content of a file created from FileSystem and Path with
|
||||||
* the specified byte[] buffer's content
|
* the specified byte[] buffer's content
|
||||||
|
@ -180,6 +208,18 @@ public class AppendTestUtil {
|
||||||
*/
|
*/
|
||||||
public static void checkFullFile(FileSystem fs, Path name, int len,
|
public static void checkFullFile(FileSystem fs, Path name, int len,
|
||||||
final byte[] compareContent, String message) throws IOException {
|
final byte[] compareContent, String message) throws IOException {
|
||||||
|
checkFullFile(fs, name, len, compareContent, message, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void checkFullFile(FileSystem fs, Path name, int len,
|
||||||
|
final byte[] compareContent, String message,
|
||||||
|
boolean checkFileStatus) throws IOException {
|
||||||
|
if (checkFileStatus) {
|
||||||
|
final FileStatus status = fs.getFileStatus(name);
|
||||||
|
assertEquals("len=" + len + " but status.getLen()=" + status.getLen(),
|
||||||
|
len, status.getLen());
|
||||||
|
}
|
||||||
|
|
||||||
FSDataInputStream stm = fs.open(name);
|
FSDataInputStream stm = fs.open(name);
|
||||||
byte[] actual = new byte[len];
|
byte[] actual = new byte[len];
|
||||||
stm.readFully(0, actual);
|
stm.readFully(0, actual);
|
||||||
|
|
|
@ -23,8 +23,8 @@ import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -819,14 +819,17 @@ public class DFSTestUtil {
|
||||||
* Get a FileSystem instance as specified user in a doAs block.
|
* Get a FileSystem instance as specified user in a doAs block.
|
||||||
*/
|
*/
|
||||||
static public FileSystem getFileSystemAs(UserGroupInformation ugi,
|
static public FileSystem getFileSystemAs(UserGroupInformation ugi,
|
||||||
final Configuration conf) throws IOException,
|
final Configuration conf) throws IOException {
|
||||||
InterruptedException {
|
try {
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||||
@Override
|
@Override
|
||||||
public FileSystem run() throws Exception {
|
public FileSystem run() throws Exception {
|
||||||
return FileSystem.get(conf);
|
return FileSystem.get(conf);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static byte[] generateSequentialBytes(int start, int length) {
|
public static byte[] generateSequentialBytes(int start, int length) {
|
||||||
|
|
|
@ -103,9 +103,10 @@ public class TestFileAppend{
|
||||||
System.arraycopy(fileContents, 0, expected, 0, expected.length);
|
System.arraycopy(fileContents, 0, expected, 0, expected.length);
|
||||||
}
|
}
|
||||||
// do a sanity check. Read the file
|
// do a sanity check. Read the file
|
||||||
|
// do not check file status since the file is not yet closed.
|
||||||
AppendTestUtil.checkFullFile(fileSys, name,
|
AppendTestUtil.checkFullFile(fileSys, name,
|
||||||
AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE,
|
AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE,
|
||||||
expected, "Read 1");
|
expected, "Read 1", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -64,7 +64,6 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -85,7 +84,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
@ -405,8 +403,7 @@ public class TestFileCreation {
|
||||||
fs2.create(p, false);
|
fs2.create(p, false);
|
||||||
fail("Did not throw!");
|
fail("Did not throw!");
|
||||||
} catch (IOException abce) {
|
} catch (IOException abce) {
|
||||||
GenericTestUtils.assertExceptionContains("already being created by",
|
GenericTestUtils.assertExceptionContains("Failed to CREATE_FILE", abce);
|
||||||
abce);
|
|
||||||
}
|
}
|
||||||
// NameNodeProxies' createNNProxyWithClientProtocol has 5 retries.
|
// NameNodeProxies' createNNProxyWithClientProtocol has 5 retries.
|
||||||
assertCounter("AlreadyBeingCreatedExceptionNumOps",
|
assertCounter("AlreadyBeingCreatedExceptionNumOps",
|
||||||
|
|
|
@ -490,8 +490,8 @@ public class TestHFlush {
|
||||||
|
|
||||||
|
|
||||||
// verify that entire file is good
|
// verify that entire file is good
|
||||||
AppendTestUtil.checkFullFile(fs, p, 4,
|
AppendTestUtil.checkFullFile(fs, p, 4, fileContents,
|
||||||
fileContents, "Failed to deal with thread interruptions");
|
"Failed to deal with thread interruptions", false);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -292,9 +293,13 @@ public class TestSafeMode {
|
||||||
try {
|
try {
|
||||||
f.run(fs);
|
f.run(fs);
|
||||||
fail(msg);
|
fail(msg);
|
||||||
} catch (IOException ioe) {
|
} catch (RemoteException re) {
|
||||||
assertTrue(ioe.getMessage().contains("safe mode"));
|
assertEquals(SafeModeException.class.getName(), re.getClassName());
|
||||||
}
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Name node is in safe mode", re);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
fail(msg + " " + StringUtils.stringifyException(ioe));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -341,6 +346,12 @@ public class TestSafeMode {
|
||||||
DFSTestUtil.appendFile(fs, file1, "new bytes");
|
DFSTestUtil.appendFile(fs, file1, "new bytes");
|
||||||
}});
|
}});
|
||||||
|
|
||||||
|
runFsFun("Truncate file while in SM", new FSRun() {
|
||||||
|
@Override
|
||||||
|
public void run(FileSystem fs) throws IOException {
|
||||||
|
fs.truncate(file1, 0);
|
||||||
|
}});
|
||||||
|
|
||||||
runFsFun("Delete file while in SM", new FSRun() {
|
runFsFun("Delete file while in SM", new FSRun() {
|
||||||
@Override
|
@Override
|
||||||
public void run(FileSystem fs) throws IOException {
|
public void run(FileSystem fs) throws IOException {
|
||||||
|
|
|
@ -39,11 +39,14 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FsShell;
|
import org.apache.hadoop.fs.FsShell;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
@ -124,23 +127,85 @@ public class TestFileTruncate {
|
||||||
|
|
||||||
int newLength = fileLength - toTruncate;
|
int newLength = fileLength - toTruncate;
|
||||||
boolean isReady = fs.truncate(p, newLength);
|
boolean isReady = fs.truncate(p, newLength);
|
||||||
|
LOG.info("fileLength=" + fileLength + ", newLength=" + newLength
|
||||||
|
+ ", toTruncate=" + toTruncate + ", isReady=" + isReady);
|
||||||
|
|
||||||
if(!isReady)
|
assertEquals("File must be closed for zero truncate"
|
||||||
|
+ " or truncating at the block boundary",
|
||||||
|
isReady, toTruncate == 0 || newLength % BLOCK_SIZE == 0);
|
||||||
|
if (!isReady) {
|
||||||
checkBlockRecovery(p);
|
checkBlockRecovery(p);
|
||||||
|
}
|
||||||
FileStatus fileStatus = fs.getFileStatus(p);
|
|
||||||
assertThat(fileStatus.getLen(), is((long) newLength));
|
|
||||||
|
|
||||||
ContentSummary cs = fs.getContentSummary(parent);
|
ContentSummary cs = fs.getContentSummary(parent);
|
||||||
assertEquals("Bad disk space usage",
|
assertEquals("Bad disk space usage",
|
||||||
cs.getSpaceConsumed(), newLength * REPLICATION);
|
cs.getSpaceConsumed(), newLength * REPLICATION);
|
||||||
// validate the file content
|
// validate the file content
|
||||||
AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString());
|
checkFullFile(p, newLength, contents);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fs.delete(parent, true);
|
fs.delete(parent, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Truncate the same file multiple times until its size is zero. */
|
||||||
|
@Test
|
||||||
|
public void testMultipleTruncate() throws IOException {
|
||||||
|
Path dir = new Path("/testMultipleTruncate");
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
final Path p = new Path(dir, "file");
|
||||||
|
final byte[] data = new byte[100 * BLOCK_SIZE];
|
||||||
|
DFSUtil.getRandom().nextBytes(data);
|
||||||
|
writeContents(data, data.length, p);
|
||||||
|
|
||||||
|
for(int n = data.length; n > 0; ) {
|
||||||
|
final int newLength = DFSUtil.getRandom().nextInt(n);
|
||||||
|
final boolean isReady = fs.truncate(p, newLength);
|
||||||
|
LOG.info("newLength=" + newLength + ", isReady=" + isReady);
|
||||||
|
assertEquals("File must be closed for truncating at the block boundary",
|
||||||
|
isReady, newLength % BLOCK_SIZE == 0);
|
||||||
|
if (!isReady) {
|
||||||
|
checkBlockRecovery(p);
|
||||||
|
}
|
||||||
|
checkFullFile(p, newLength, data);
|
||||||
|
n = newLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.delete(dir, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncate files and then run other operations such as
|
||||||
|
* rename, set replication, set permission, etc.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTruncateWithOtherOperations() throws IOException {
|
||||||
|
Path dir = new Path("/testTruncateOtherOperations");
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
final Path p = new Path(dir, "file");
|
||||||
|
final byte[] data = new byte[2 * BLOCK_SIZE];
|
||||||
|
|
||||||
|
DFSUtil.getRandom().nextBytes(data);
|
||||||
|
writeContents(data, data.length, p);
|
||||||
|
|
||||||
|
final int newLength = data.length - 1;
|
||||||
|
boolean isReady = fs.truncate(p, newLength);
|
||||||
|
assertFalse(isReady);
|
||||||
|
|
||||||
|
fs.setReplication(p, (short)(REPLICATION - 1));
|
||||||
|
fs.setPermission(p, FsPermission.createImmutable((short)0444));
|
||||||
|
|
||||||
|
final Path q = new Path(dir, "newFile");
|
||||||
|
fs.rename(p, q);
|
||||||
|
|
||||||
|
checkBlockRecovery(q);
|
||||||
|
checkFullFile(q, newLength, data);
|
||||||
|
|
||||||
|
cluster.restartNameNode();
|
||||||
|
checkFullFile(q, newLength, data);
|
||||||
|
|
||||||
|
fs.delete(dir, true);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSnapshotWithAppendTruncate() throws IOException {
|
public void testSnapshotWithAppendTruncate() throws IOException {
|
||||||
testSnapshotWithAppendTruncate(0, 1, 2);
|
testSnapshotWithAppendTruncate(0, 1, 2);
|
||||||
|
@ -434,15 +499,35 @@ public class TestFileTruncate {
|
||||||
int toTruncate = 1;
|
int toTruncate = 1;
|
||||||
|
|
||||||
byte[] contents = AppendTestUtil.initBuffer(startingFileSize);
|
byte[] contents = AppendTestUtil.initBuffer(startingFileSize);
|
||||||
final Path p = new Path("/testTruncateFailure");
|
final Path dir = new Path("/dir");
|
||||||
FSDataOutputStream out = fs.create(p, false, BLOCK_SIZE, REPLICATION,
|
final Path p = new Path(dir, "testTruncateFailure");
|
||||||
BLOCK_SIZE);
|
{
|
||||||
out.write(contents, 0, startingFileSize);
|
FSDataOutputStream out = fs.create(p, false, BLOCK_SIZE, REPLICATION,
|
||||||
try {
|
BLOCK_SIZE);
|
||||||
fs.truncate(p, 0);
|
out.write(contents, 0, startingFileSize);
|
||||||
fail("Truncate must fail on open file.");
|
try {
|
||||||
} catch(IOException expected) {}
|
fs.truncate(p, 0);
|
||||||
out.close();
|
fail("Truncate must fail on open file.");
|
||||||
|
} catch (IOException expected) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Failed to TRUNCATE_FILE", expected);
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
FSDataOutputStream out = fs.append(p);
|
||||||
|
try {
|
||||||
|
fs.truncate(p, 0);
|
||||||
|
fail("Truncate must fail for append.");
|
||||||
|
} catch (IOException expected) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Failed to TRUNCATE_FILE", expected);
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
fs.truncate(p, -1);
|
fs.truncate(p, -1);
|
||||||
|
@ -452,6 +537,45 @@ public class TestFileTruncate {
|
||||||
"Cannot truncate to a negative file size", expected);
|
"Cannot truncate to a negative file size", expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
fs.truncate(p, startingFileSize + 1);
|
||||||
|
fail("Truncate must fail for a larger new length.");
|
||||||
|
} catch (Exception expected) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Cannot truncate to a larger file size", expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
fs.truncate(dir, 0);
|
||||||
|
fail("Truncate must fail for a directory.");
|
||||||
|
} catch (Exception expected) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Path is not a file", expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
fs.truncate(new Path(dir, "non-existing"), 0);
|
||||||
|
fail("Truncate must fail for a non-existing file.");
|
||||||
|
} catch (Exception expected) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"File does not exist", expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fs.setPermission(p, FsPermission.createImmutable((short)0664));
|
||||||
|
{
|
||||||
|
final UserGroupInformation fooUgi =
|
||||||
|
UserGroupInformation.createUserForTesting("foo", new String[]{"foo"});
|
||||||
|
try {
|
||||||
|
final FileSystem foofs = DFSTestUtil.getFileSystemAs(fooUgi, conf);
|
||||||
|
foofs.truncate(p, 0);
|
||||||
|
fail("Truncate must fail for no WRITE permission.");
|
||||||
|
} catch (Exception expected) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Permission denied", expected);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cluster.shutdownDataNodes();
|
cluster.shutdownDataNodes();
|
||||||
NameNodeAdapter.getLeaseManager(cluster.getNamesystem())
|
NameNodeAdapter.getLeaseManager(cluster.getNamesystem())
|
||||||
.setLeasePeriod(LOW_SOFTLIMIT, LOW_HARDLIMIT);
|
.setLeasePeriod(LOW_SOFTLIMIT, LOW_HARDLIMIT);
|
||||||
|
@ -461,6 +585,16 @@ public class TestFileTruncate {
|
||||||
assertThat("truncate should have triggered block recovery.",
|
assertThat("truncate should have triggered block recovery.",
|
||||||
isReady, is(false));
|
isReady, is(false));
|
||||||
|
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
fs.truncate(p, 0);
|
||||||
|
fail("Truncate must fail since a trancate is already in pregress.");
|
||||||
|
} catch (IOException expected) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Failed to TRUNCATE_FILE", expected);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
boolean recoveryTriggered = false;
|
boolean recoveryTriggered = false;
|
||||||
for(int i = 0; i < RECOVERY_ATTEMPTS; i++) {
|
for(int i = 0; i < RECOVERY_ATTEMPTS; i++) {
|
||||||
String leaseHolder =
|
String leaseHolder =
|
||||||
|
@ -484,9 +618,6 @@ public class TestFileTruncate {
|
||||||
.setLeasePeriod(HdfsConstants.LEASE_SOFTLIMIT_PERIOD,
|
.setLeasePeriod(HdfsConstants.LEASE_SOFTLIMIT_PERIOD,
|
||||||
HdfsConstants.LEASE_HARDLIMIT_PERIOD);
|
HdfsConstants.LEASE_HARDLIMIT_PERIOD);
|
||||||
|
|
||||||
FileStatus fileStatus = fs.getFileStatus(p);
|
|
||||||
assertThat(fileStatus.getLen(), is((long) newLength));
|
|
||||||
|
|
||||||
checkFullFile(p, newLength, contents);
|
checkFullFile(p, newLength, contents);
|
||||||
fs.delete(p, false);
|
fs.delete(p, false);
|
||||||
}
|
}
|
||||||
|
@ -519,10 +650,6 @@ public class TestFileTruncate {
|
||||||
cluster.getNamesystem().recoverLease(s, holder, "");
|
cluster.getNamesystem().recoverLease(s, holder, "");
|
||||||
|
|
||||||
checkBlockRecovery(p);
|
checkBlockRecovery(p);
|
||||||
|
|
||||||
FileStatus fileStatus = fs.getFileStatus(p);
|
|
||||||
assertThat(fileStatus.getLen(), is((long) newLength));
|
|
||||||
|
|
||||||
checkFullFile(p, newLength, contents);
|
checkFullFile(p, newLength, contents);
|
||||||
fs.delete(p, false);
|
fs.delete(p, false);
|
||||||
}
|
}
|
||||||
|
@ -798,9 +925,14 @@ public class TestFileTruncate {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void checkBlockRecovery(Path p) throws IOException {
|
static void checkBlockRecovery(Path p) throws IOException {
|
||||||
|
checkBlockRecovery(p, fs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void checkBlockRecovery(Path p, DistributedFileSystem dfs)
|
||||||
|
throws IOException {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
for(int i = 0; i < SUCCESS_ATTEMPTS; i++) {
|
for(int i = 0; i < SUCCESS_ATTEMPTS; i++) {
|
||||||
LocatedBlocks blocks = getLocatedBlocks(p);
|
LocatedBlocks blocks = getLocatedBlocks(p, dfs);
|
||||||
boolean noLastBlock = blocks.getLastLocatedBlock() == null;
|
boolean noLastBlock = blocks.getLastLocatedBlock() == null;
|
||||||
if(!blocks.isUnderConstruction() &&
|
if(!blocks.isUnderConstruction() &&
|
||||||
(noLastBlock || blocks.isLastBlockComplete())) {
|
(noLastBlock || blocks.isLastBlockComplete())) {
|
||||||
|
@ -814,7 +946,12 @@ public class TestFileTruncate {
|
||||||
}
|
}
|
||||||
|
|
||||||
static LocatedBlocks getLocatedBlocks(Path src) throws IOException {
|
static LocatedBlocks getLocatedBlocks(Path src) throws IOException {
|
||||||
return fs.getClient().getLocatedBlocks(src.toString(), 0, Long.MAX_VALUE);
|
return getLocatedBlocks(src, fs);
|
||||||
|
}
|
||||||
|
|
||||||
|
static LocatedBlocks getLocatedBlocks(Path src, DistributedFileSystem dfs)
|
||||||
|
throws IOException {
|
||||||
|
return dfs.getClient().getLocatedBlocks(src.toString(), 0, Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void assertBlockExists(Block blk) {
|
static void assertBlockExists(Block blk) {
|
||||||
|
|
|
@ -19,21 +19,33 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSck;
|
import org.apache.hadoop.hdfs.tools.DFSck;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestHAAppend {
|
public class TestHAAppend {
|
||||||
|
static final int COUNT = 5;
|
||||||
|
|
||||||
|
static FSDataOutputStream createAndHflush(FileSystem fs, Path file,
|
||||||
|
byte[] data, int length) throws IOException{
|
||||||
|
FSDataOutputStream out = fs.create(file, false, 4096, (short)3, 1024);
|
||||||
|
out.write(data, 0, length);
|
||||||
|
out.hflush();
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test to verify the processing of PendingDataNodeMessageQueue in case of
|
* Test to verify the processing of PendingDataNodeMessageQueue in case of
|
||||||
* append. One block will marked as corrupt if the OP_ADD, OP_UPDATE_BLOCKS
|
* append. One block will marked as corrupt if the OP_ADD, OP_UPDATE_BLOCKS
|
||||||
|
@ -58,22 +70,37 @@ public class TestHAAppend {
|
||||||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
|
|
||||||
Path fileToAppend = new Path("/FileToAppend");
|
Path fileToAppend = new Path("/FileToAppend");
|
||||||
|
Path fileToTruncate = new Path("/FileToTruncate");
|
||||||
|
|
||||||
|
final byte[] data = new byte[1 << 16];
|
||||||
|
DFSUtil.getRandom().nextBytes(data);
|
||||||
|
final int[] appendPos = AppendTestUtil.randomFilePartition(
|
||||||
|
data.length, COUNT);
|
||||||
|
final int[] truncatePos = AppendTestUtil.randomFilePartition(
|
||||||
|
data.length, 1);
|
||||||
|
|
||||||
// Create file, write some data, and hflush so that the first
|
// Create file, write some data, and hflush so that the first
|
||||||
// block is in the edit log prior to roll.
|
// block is in the edit log prior to roll.
|
||||||
FSDataOutputStream out = fs.create(fileToAppend);
|
FSDataOutputStream out = createAndHflush(
|
||||||
out.writeBytes("/data");
|
fs, fileToAppend, data, appendPos[0]);
|
||||||
out.hflush();
|
|
||||||
|
FSDataOutputStream out4Truncate = createAndHflush(
|
||||||
|
fs, fileToTruncate, data, data.length);
|
||||||
|
|
||||||
// Let the StandbyNode catch the creation of the file.
|
// Let the StandbyNode catch the creation of the file.
|
||||||
cluster.getNameNode(0).getRpcServer().rollEditLog();
|
cluster.getNameNode(0).getRpcServer().rollEditLog();
|
||||||
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
|
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
out.close();
|
out.close();
|
||||||
|
out4Truncate.close();
|
||||||
|
|
||||||
// Append and re-close a few time, so that many block entries are queued.
|
// Append and re-close a few time, so that many block entries are queued.
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < COUNT; i++) {
|
||||||
DFSTestUtil.appendFile(fs, fileToAppend, "data");
|
int end = i < COUNT - 1? appendPos[i + 1]: data.length;
|
||||||
|
out = fs.append(fileToAppend);
|
||||||
|
out.write(data, appendPos[i], end - appendPos[i]);
|
||||||
|
out.close();
|
||||||
}
|
}
|
||||||
|
boolean isTruncateReady = fs.truncate(fileToTruncate, truncatePos[0]);
|
||||||
|
|
||||||
// Ensure that blocks have been reported to the SBN ahead of the edits
|
// Ensure that blocks have been reported to the SBN ahead of the edits
|
||||||
// arriving.
|
// arriving.
|
||||||
|
@ -90,6 +117,16 @@ public class TestHAAppend {
|
||||||
|
|
||||||
assertEquals("CorruptBlocks should be empty.", 0, cluster.getNameNode(1)
|
assertEquals("CorruptBlocks should be empty.", 0, cluster.getNameNode(1)
|
||||||
.getNamesystem().getCorruptReplicaBlocks());
|
.getNamesystem().getCorruptReplicaBlocks());
|
||||||
|
|
||||||
|
AppendTestUtil.checkFullFile(fs, fileToAppend, data.length, data,
|
||||||
|
fileToAppend.toString());
|
||||||
|
|
||||||
|
if (!isTruncateReady) {
|
||||||
|
TestFileTruncate.checkBlockRecovery(fileToTruncate,
|
||||||
|
cluster.getFileSystem(1));
|
||||||
|
}
|
||||||
|
AppendTestUtil.checkFullFile(fs, fileToTruncate, truncatePos[0], data,
|
||||||
|
fileToTruncate.toString());
|
||||||
} finally {
|
} finally {
|
||||||
if (null != cluster) {
|
if (null != cluster) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue