HDFS-7738. Revise the exception message for recover lease; add more truncate tests such as truncate with HA setup, negative tests, truncate with other operations and multiple truncates.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAAppend.java
This commit is contained in:
Tsz-Wo Nicholas Sze 2015-02-07 15:21:16 -08:00
parent 964a3b0c3e
commit c5f18ba65b
10 changed files with 320 additions and 76 deletions

View File

@ -303,6 +303,10 @@ Release 2.7.0 - UNRELEASED
HDFS-7710. Remove dead code in BackupImage.java. (Xiaoyu Yao via aajisaka)
HDFS-7738. Revise the exception message for recover lease; add more truncate
tests such as truncate with HA setup, negative tests, truncate with other
operations and multiple truncates. (szetszwo)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -231,7 +231,6 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@ -2019,8 +2018,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new UnsupportedOperationException(
"Cannot truncate lazy persist file " + src);
}
// Opening an existing file for write. May need lease recovery.
recoverLeaseInternal(iip, src, clientName, clientMachine, false);
// Opening an existing file for truncate. May need lease recovery.
recoverLeaseInternal(RecoverLeaseOp.TRUNCATE_FILE,
iip, src, clientName, clientMachine, false);
// Truncate length check.
long oldLength = file.computeFileSize();
if(oldLength == newLength) {
@ -2490,7 +2490,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
} else {
// If lease soft limit time is expired, recover the lease
recoverLeaseInternal(iip, src, holder, clientMachine, false);
recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE,
iip, src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists");
}
@ -2613,8 +2614,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new UnsupportedOperationException(
"Cannot append to lazy persist file " + src);
}
// Opening an existing file for write - may need to recover lease.
recoverLeaseInternal(iip, src, holder, clientMachine, false);
// Opening an existing file for append - may need to recover lease.
recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE,
iip, src, holder, clientMachine, false);
final BlockInfo lastBlock = myFile.getLastBlock();
// Check that the block has at least minimum replication.
@ -2704,7 +2706,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
dir.checkPathAccess(pc, iip, FsAction.WRITE);
}
recoverLeaseInternal(iip, src, holder, clientMachine, true);
recoverLeaseInternal(RecoverLeaseOp.RECOVER_LEASE,
iip, src, holder, clientMachine, true);
} catch (StandbyException se) {
skipSync = true;
throw se;
@ -2719,7 +2722,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return false;
}
void recoverLeaseInternal(INodesInPath iip,
private enum RecoverLeaseOp {
CREATE_FILE,
APPEND_FILE,
TRUNCATE_FILE,
RECOVER_LEASE;
private String getExceptionMessage(String src, String holder,
String clientMachine, String reason) {
return "Failed to " + this + " " + src + " for " + holder +
" on " + clientMachine + " because " + reason;
}
}
void recoverLeaseInternal(RecoverLeaseOp op, INodesInPath iip,
String src, String holder, String clientMachine, boolean force)
throws IOException {
assert hasWriteLock();
@ -2730,18 +2746,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// leases. Find the appropriate lease record.
//
Lease lease = leaseManager.getLease(holder);
//
// We found the lease for this file. And surprisingly the original
// holder is trying to recreate this file. This should never occur.
//
if (!force && lease != null) {
Lease leaseFile = leaseManager.getLeaseByPath(src);
if (leaseFile != null && leaseFile.equals(lease)) {
// We found the lease for this file but the original
// holder is trying to obtain it again.
throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder +
" for client " + clientMachine +
" because current leaseholder is trying to recreate file.");
op.getExceptionMessage(src, holder, clientMachine,
holder + " is already the current lease holder."));
}
}
//
@ -2752,9 +2765,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
lease = leaseManager.getLease(clientName);
if (lease == null) {
throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder +
" for client " + clientMachine +
" because pendingCreates is non-null but no leases found.");
op.getExceptionMessage(src, holder, clientMachine,
"the file is under construction but no leases found."));
}
if (force) {
// close now: no need to wait for soft lease expiration and
@ -2776,20 +2788,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean isClosed = internalReleaseLease(lease, src, iip, null);
if(!isClosed)
throw new RecoveryInProgressException(
"Failed to close file " + src +
". Lease recovery is in progress. Try again later.");
op.getExceptionMessage(src, holder, clientMachine,
"lease recovery is in progress. Try again later."));
} else {
final BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
throw new RecoveryInProgressException("Recovery in progress, file ["
+ src + "], " + "lease owner [" + lease.getHolder() + "]");
throw new RecoveryInProgressException(
op.getExceptionMessage(src, holder, clientMachine,
"another recovery is in progress by "
+ clientName + " on " + uc.getClientMachine()));
} else {
throw new AlreadyBeingCreatedException("Failed to create file ["
+ src + "] for [" + holder + "] for client [" + clientMachine
+ "], because this file is already being created by ["
+ clientName + "] on ["
+ uc.getClientMachine() + "]");
throw new AlreadyBeingCreatedException(
op.getExceptionMessage(src, holder, clientMachine,
"this file lease is currently owned by "
+ clientName + " on " + uc.getClientMachine()));
}
}
}

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Random;
import org.apache.commons.logging.Log;
@ -80,6 +82,27 @@ public class AppendTestUtil {
return b;
}
/** @return a random file partition of length n. */
public static int[] randomFilePartition(int n, int parts) {
int[] p = new int[parts];
for(int i = 0; i < p.length; i++) {
p[i] = nextInt(n - i - 1) + 1;
}
Arrays.sort(p);
for(int i = 1; i < p.length; i++) {
if (p[i] <= p[i - 1]) {
p[i] = p[i - 1] + 1;
}
}
LOG.info("partition=" + Arrays.toString(p));
assertTrue("i=0", p[0] > 0 && p[0] < n);
for(int i = 1; i < p.length; i++) {
assertTrue("i=" + i, p[i] > p[i - 1] && p[i] < n);
}
return p;
}
static void sleep(long ms) {
try {
Thread.sleep(ms);
@ -157,6 +180,11 @@ public class AppendTestUtil {
(short) repl, BLOCK_SIZE);
}
public static void checkFullFile(FileSystem fs, Path file, int len,
final byte[] compareContent) throws IOException {
checkFullFile(fs, file, len, compareContent, file.toString());
}
/**
* Compare the content of a file created from FileSystem and Path with
* the specified byte[] buffer's content
@ -164,6 +192,18 @@ public class AppendTestUtil {
*/
public static void checkFullFile(FileSystem fs, Path name, int len,
final byte[] compareContent, String message) throws IOException {
checkFullFile(fs, name, len, compareContent, message, true);
}
public static void checkFullFile(FileSystem fs, Path name, int len,
final byte[] compareContent, String message,
boolean checkFileStatus) throws IOException {
if (checkFileStatus) {
final FileStatus status = fs.getFileStatus(name);
assertEquals("len=" + len + " but status.getLen()=" + status.getLen(),
len, status.getLen());
}
FSDataInputStream stm = fs.open(name);
byte[] actual = new byte[len];
stm.readFully(0, actual);

View File

@ -23,8 +23,8 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -843,14 +843,17 @@ public class DFSTestUtil {
* Get a FileSystem instance as specified user in a doAs block.
*/
static public FileSystem getFileSystemAs(UserGroupInformation ugi,
final Configuration conf) throws IOException,
InterruptedException {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(conf);
}
});
final Configuration conf) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(conf);
}
});
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
}
public static byte[] generateSequentialBytes(int start, int length) {

View File

@ -101,9 +101,10 @@ public class TestFileAppend{
System.arraycopy(fileContents, 0, expected, 0, expected.length);
}
// do a sanity check. Read the file
// do not check file status since the file is not yet closed.
AppendTestUtil.checkFullFile(fileSys, name,
AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE,
expected, "Read 1");
expected, "Read 1", false);
}
/**

View File

@ -64,7 +64,6 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -85,7 +84,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
@ -405,8 +403,7 @@ public class TestFileCreation {
fs2.create(p, false);
fail("Did not throw!");
} catch (IOException abce) {
GenericTestUtils.assertExceptionContains("already being created by",
abce);
GenericTestUtils.assertExceptionContains("Failed to CREATE_FILE", abce);
}
// NameNodeProxies' createNNProxyWithClientProtocol has 5 retries.
assertCounter("AlreadyBeingCreatedExceptionNumOps",

View File

@ -374,8 +374,8 @@ public class TestHFlush {
// verify that entire file is good
AppendTestUtil.checkFullFile(fs, p, 4,
fileContents, "Failed to deal with thread interruptions");
AppendTestUtil.checkFullFile(fs, p, 4, fileContents,
"Failed to deal with thread interruptions", false);
} finally {
cluster.shutdown();
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -292,9 +293,13 @@ public class TestSafeMode {
try {
f.run(fs);
fail(msg);
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("safe mode"));
}
} catch (RemoteException re) {
assertEquals(SafeModeException.class.getName(), re.getClassName());
GenericTestUtils.assertExceptionContains(
"Name node is in safe mode", re);
} catch (IOException ioe) {
fail(msg + " " + StringUtils.stringifyException(ioe));
}
}
/**
@ -341,6 +346,12 @@ public class TestSafeMode {
DFSTestUtil.appendFile(fs, file1, "new bytes");
}});
runFsFun("Truncate file while in SM", new FSRun() {
@Override
public void run(FileSystem fs) throws IOException {
fs.truncate(file1, 0);
}});
runFsFun("Delete file while in SM", new FSRun() {
@Override
public void run(FileSystem fs) throws IOException {

View File

@ -39,11 +39,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -124,23 +127,85 @@ public class TestFileTruncate {
int newLength = fileLength - toTruncate;
boolean isReady = fs.truncate(p, newLength);
LOG.info("fileLength=" + fileLength + ", newLength=" + newLength
+ ", toTruncate=" + toTruncate + ", isReady=" + isReady);
if(!isReady)
assertEquals("File must be closed for zero truncate"
+ " or truncating at the block boundary",
isReady, toTruncate == 0 || newLength % BLOCK_SIZE == 0);
if (!isReady) {
checkBlockRecovery(p);
FileStatus fileStatus = fs.getFileStatus(p);
assertThat(fileStatus.getLen(), is((long) newLength));
}
ContentSummary cs = fs.getContentSummary(parent);
assertEquals("Bad disk space usage",
cs.getSpaceConsumed(), newLength * REPLICATION);
// validate the file content
AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString());
checkFullFile(p, newLength, contents);
}
}
fs.delete(parent, true);
}
/** Truncate the same file multiple times until its size is zero. */
@Test
public void testMultipleTruncate() throws IOException {
Path dir = new Path("/testMultipleTruncate");
fs.mkdirs(dir);
final Path p = new Path(dir, "file");
final byte[] data = new byte[100 * BLOCK_SIZE];
DFSUtil.getRandom().nextBytes(data);
writeContents(data, data.length, p);
for(int n = data.length; n > 0; ) {
final int newLength = DFSUtil.getRandom().nextInt(n);
final boolean isReady = fs.truncate(p, newLength);
LOG.info("newLength=" + newLength + ", isReady=" + isReady);
assertEquals("File must be closed for truncating at the block boundary",
isReady, newLength % BLOCK_SIZE == 0);
if (!isReady) {
checkBlockRecovery(p);
}
checkFullFile(p, newLength, data);
n = newLength;
}
fs.delete(dir, true);
}
/**
* Truncate files and then run other operations such as
* rename, set replication, set permission, etc.
*/
@Test
public void testTruncateWithOtherOperations() throws IOException {
Path dir = new Path("/testTruncateOtherOperations");
fs.mkdirs(dir);
final Path p = new Path(dir, "file");
final byte[] data = new byte[2 * BLOCK_SIZE];
DFSUtil.getRandom().nextBytes(data);
writeContents(data, data.length, p);
final int newLength = data.length - 1;
boolean isReady = fs.truncate(p, newLength);
assertFalse(isReady);
fs.setReplication(p, (short)(REPLICATION - 1));
fs.setPermission(p, FsPermission.createImmutable((short)0444));
final Path q = new Path(dir, "newFile");
fs.rename(p, q);
checkBlockRecovery(q);
checkFullFile(q, newLength, data);
cluster.restartNameNode();
checkFullFile(q, newLength, data);
fs.delete(dir, true);
}
@Test
public void testSnapshotWithAppendTruncate() throws IOException {
testSnapshotWithAppendTruncate(0, 1, 2);
@ -434,15 +499,35 @@ public class TestFileTruncate {
int toTruncate = 1;
byte[] contents = AppendTestUtil.initBuffer(startingFileSize);
final Path p = new Path("/testTruncateFailure");
FSDataOutputStream out = fs.create(p, false, BLOCK_SIZE, REPLICATION,
BLOCK_SIZE);
out.write(contents, 0, startingFileSize);
try {
fs.truncate(p, 0);
fail("Truncate must fail on open file.");
} catch(IOException expected) {}
out.close();
final Path dir = new Path("/dir");
final Path p = new Path(dir, "testTruncateFailure");
{
FSDataOutputStream out = fs.create(p, false, BLOCK_SIZE, REPLICATION,
BLOCK_SIZE);
out.write(contents, 0, startingFileSize);
try {
fs.truncate(p, 0);
fail("Truncate must fail on open file.");
} catch (IOException expected) {
GenericTestUtils.assertExceptionContains(
"Failed to TRUNCATE_FILE", expected);
} finally {
out.close();
}
}
{
FSDataOutputStream out = fs.append(p);
try {
fs.truncate(p, 0);
fail("Truncate must fail for append.");
} catch (IOException expected) {
GenericTestUtils.assertExceptionContains(
"Failed to TRUNCATE_FILE", expected);
} finally {
out.close();
}
}
try {
fs.truncate(p, -1);
@ -452,6 +537,45 @@ public class TestFileTruncate {
"Cannot truncate to a negative file size", expected);
}
try {
fs.truncate(p, startingFileSize + 1);
fail("Truncate must fail for a larger new length.");
} catch (Exception expected) {
GenericTestUtils.assertExceptionContains(
"Cannot truncate to a larger file size", expected);
}
try {
fs.truncate(dir, 0);
fail("Truncate must fail for a directory.");
} catch (Exception expected) {
GenericTestUtils.assertExceptionContains(
"Path is not a file", expected);
}
try {
fs.truncate(new Path(dir, "non-existing"), 0);
fail("Truncate must fail for a non-existing file.");
} catch (Exception expected) {
GenericTestUtils.assertExceptionContains(
"File does not exist", expected);
}
fs.setPermission(p, FsPermission.createImmutable((short)0664));
{
final UserGroupInformation fooUgi =
UserGroupInformation.createUserForTesting("foo", new String[]{"foo"});
try {
final FileSystem foofs = DFSTestUtil.getFileSystemAs(fooUgi, conf);
foofs.truncate(p, 0);
fail("Truncate must fail for no WRITE permission.");
} catch (Exception expected) {
GenericTestUtils.assertExceptionContains(
"Permission denied", expected);
}
}
cluster.shutdownDataNodes();
NameNodeAdapter.getLeaseManager(cluster.getNamesystem())
.setLeasePeriod(LOW_SOFTLIMIT, LOW_HARDLIMIT);
@ -461,6 +585,16 @@ public class TestFileTruncate {
assertThat("truncate should have triggered block recovery.",
isReady, is(false));
{
try {
fs.truncate(p, 0);
fail("Truncate must fail since a trancate is already in pregress.");
} catch (IOException expected) {
GenericTestUtils.assertExceptionContains(
"Failed to TRUNCATE_FILE", expected);
}
}
boolean recoveryTriggered = false;
for(int i = 0; i < RECOVERY_ATTEMPTS; i++) {
String leaseHolder =
@ -484,9 +618,6 @@ public class TestFileTruncate {
.setLeasePeriod(HdfsConstants.LEASE_SOFTLIMIT_PERIOD,
HdfsConstants.LEASE_HARDLIMIT_PERIOD);
FileStatus fileStatus = fs.getFileStatus(p);
assertThat(fileStatus.getLen(), is((long) newLength));
checkFullFile(p, newLength, contents);
fs.delete(p, false);
}
@ -519,10 +650,6 @@ public class TestFileTruncate {
cluster.getNamesystem().recoverLease(s, holder, "");
checkBlockRecovery(p);
FileStatus fileStatus = fs.getFileStatus(p);
assertThat(fileStatus.getLen(), is((long) newLength));
checkFullFile(p, newLength, contents);
fs.delete(p, false);
}
@ -798,9 +925,14 @@ public class TestFileTruncate {
}
static void checkBlockRecovery(Path p) throws IOException {
checkBlockRecovery(p, fs);
}
public static void checkBlockRecovery(Path p, DistributedFileSystem dfs)
throws IOException {
boolean success = false;
for(int i = 0; i < SUCCESS_ATTEMPTS; i++) {
LocatedBlocks blocks = getLocatedBlocks(p);
LocatedBlocks blocks = getLocatedBlocks(p, dfs);
boolean noLastBlock = blocks.getLastLocatedBlock() == null;
if(!blocks.isUnderConstruction() &&
(noLastBlock || blocks.isLastBlockComplete())) {
@ -814,7 +946,12 @@ public class TestFileTruncate {
}
static LocatedBlocks getLocatedBlocks(Path src) throws IOException {
return fs.getClient().getLocatedBlocks(src.toString(), 0, Long.MAX_VALUE);
return getLocatedBlocks(src, fs);
}
static LocatedBlocks getLocatedBlocks(Path src, DistributedFileSystem dfs)
throws IOException {
return dfs.getClient().getLocatedBlocks(src.toString(), 0, Long.MAX_VALUE);
}
static void assertBlockExists(Block blk) {

View File

@ -19,20 +19,33 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
public class TestHAAppend {
static final int COUNT = 5;
static FSDataOutputStream createAndHflush(FileSystem fs, Path file,
byte[] data, int length) throws IOException{
FSDataOutputStream out = fs.create(file, false, 4096, (short)3, 1024);
out.write(data, 0, length);
out.hflush();
return out;
}
/**
* Test to verify the processing of PendingDataNodeMessageQueue in case of
* append. One block will marked as corrupt if the OP_ADD, OP_UPDATE_BLOCKS
@ -57,22 +70,37 @@ public class TestHAAppend {
fs = HATestUtil.configureFailoverFs(cluster, conf);
Path fileToAppend = new Path("/FileToAppend");
Path fileToTruncate = new Path("/FileToTruncate");
final byte[] data = new byte[1 << 16];
DFSUtil.getRandom().nextBytes(data);
final int[] appendPos = AppendTestUtil.randomFilePartition(
data.length, COUNT);
final int[] truncatePos = AppendTestUtil.randomFilePartition(
data.length, 1);
// Create file, write some data, and hflush so that the first
// block is in the edit log prior to roll.
FSDataOutputStream out = fs.create(fileToAppend);
out.writeBytes("/data");
out.hflush();
FSDataOutputStream out = createAndHflush(
fs, fileToAppend, data, appendPos[0]);
FSDataOutputStream out4Truncate = createAndHflush(
fs, fileToTruncate, data, data.length);
// Let the StandbyNode catch the creation of the file.
cluster.getNameNode(0).getRpcServer().rollEditLog();
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
out.close();
out4Truncate.close();
// Append and re-close a few time, so that many block entries are queued.
for (int i = 0; i < 5; i++) {
DFSTestUtil.appendFile(fs, fileToAppend, "data");
for (int i = 0; i < COUNT; i++) {
int end = i < COUNT - 1? appendPos[i + 1]: data.length;
out = fs.append(fileToAppend);
out.write(data, appendPos[i], end - appendPos[i]);
out.close();
}
boolean isTruncateReady = fs.truncate(fileToTruncate, truncatePos[0]);
// Ensure that blocks have been reported to the SBN ahead of the edits
// arriving.
@ -89,6 +117,16 @@ public class TestHAAppend {
assertEquals("CorruptBlocks should be empty.", 0, cluster.getNameNode(1)
.getNamesystem().getCorruptReplicaBlocks());
AppendTestUtil.checkFullFile(fs, fileToAppend, data.length, data,
fileToAppend.toString());
if (!isTruncateReady) {
TestFileTruncate.checkBlockRecovery(fileToTruncate,
cluster.getFileSystem(1));
}
AppendTestUtil.checkFullFile(fs, fileToTruncate, truncatePos[0], data,
fileToTruncate.toString());
} finally {
if (null != cluster) {
cluster.shutdown();
@ -98,4 +136,4 @@ public class TestHAAppend {
}
}
}
}
}