HDFS-12316. Verify HDFS snapshot deletion doesn't crash the ongoing file writes.
This commit is contained in:
parent
b298948897
commit
4230872dd6
|
@ -23,7 +23,11 @@ import java.util.EnumSet;
|
|||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -38,12 +42,15 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestOpenFilesWithSnapshot {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestOpenFilesWithSnapshot.class.getName());
|
||||
private final Configuration conf = new Configuration();
|
||||
MiniDFSCluster cluster = null;
|
||||
DistributedFileSystem fs = null;
|
||||
|
@ -622,6 +629,108 @@ public class TestOpenFilesWithSnapshot {
|
|||
hbaseOutputStream.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test client writing to open files are not interrupted when snapshots
|
||||
* that captured open files get deleted.
|
||||
*/
|
||||
@Test (timeout = 240000)
|
||||
public void testOpenFileWritingAcrossSnapDeletion() throws Exception {
|
||||
final Path snapRootDir = new Path("/level_0_A");
|
||||
final String flumeFileName = "flume.log";
|
||||
final String hbaseFileName = "hbase.log";
|
||||
final String snap1Name = "snap_1";
|
||||
final String snap2Name = "snap_2";
|
||||
final String snap3Name = "snap_3";
|
||||
|
||||
// Create files and open streams
|
||||
final Path flumeFile = new Path(snapRootDir, flumeFileName);
|
||||
FSDataOutputStream flumeOut = fs.create(flumeFile, false,
|
||||
8000, (short)3, 1048576);
|
||||
flumeOut.close();
|
||||
final Path hbaseFile = new Path(snapRootDir, hbaseFileName);
|
||||
FSDataOutputStream hbaseOut = fs.create(hbaseFile, false,
|
||||
8000, (short)3, 1048576);
|
||||
hbaseOut.close();
|
||||
|
||||
final AtomicBoolean writerError = new AtomicBoolean(false);
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
final CountDownLatch deleteLatch = new CountDownLatch(1);
|
||||
Thread t = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
FSDataOutputStream flumeOutputStream = fs.append(flumeFile, 8000);
|
||||
FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile, 8000);
|
||||
byte[] bytes = new byte[(int) (1024 * 0.2)];
|
||||
Random r = new Random(Time.now());
|
||||
|
||||
for (int i = 0; i < 200000; i++) {
|
||||
r.nextBytes(bytes);
|
||||
flumeOutputStream.write(bytes);
|
||||
if (hbaseOutputStream != null) {
|
||||
hbaseOutputStream.write(bytes);
|
||||
}
|
||||
if (i == 50000) {
|
||||
startLatch.countDown();
|
||||
} else if (i == 100000) {
|
||||
deleteLatch.countDown();
|
||||
} else if (i == 150000) {
|
||||
hbaseOutputStream.hsync();
|
||||
fs.delete(hbaseFile, true);
|
||||
try {
|
||||
hbaseOutputStream.close();
|
||||
} catch (Exception e) {
|
||||
// since the file is deleted before the open stream close,
|
||||
// it might throw FileNotFoundException. Ignore the
|
||||
// expected exception.
|
||||
}
|
||||
hbaseOutputStream = null;
|
||||
} else if (i % 5000 == 0) {
|
||||
LOG.info("Write pos: " + flumeOutputStream.getPos()
|
||||
+ ", size: " + fs.getFileStatus(flumeFile).getLen()
|
||||
+ ", loop: " + (i + 1));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Writer error: " + e);
|
||||
writerError.set(true);
|
||||
}
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
|
||||
startLatch.await();
|
||||
final Path snap1Dir = SnapshotTestHelper.createSnapshot(
|
||||
fs, snapRootDir, snap1Name);
|
||||
final Path flumeS1Path = new Path(snap1Dir, flumeFileName);
|
||||
LOG.info("Snap1 file status: " + fs.getFileStatus(flumeS1Path));
|
||||
LOG.info("Current file status: " + fs.getFileStatus(flumeFile));
|
||||
|
||||
deleteLatch.await();
|
||||
LOG.info("Snap1 file status: " + fs.getFileStatus(flumeS1Path));
|
||||
LOG.info("Current file status: " + fs.getFileStatus(flumeFile));
|
||||
|
||||
// Verify deletion of snapshot which had the under construction file
|
||||
// captured is not truncating the under construction file and the thread
|
||||
// writing to the same file not crashing on newer block allocations.
|
||||
LOG.info("Deleting " + snap1Name);
|
||||
fs.deleteSnapshot(snapRootDir, snap1Name);
|
||||
|
||||
// Verify creation and deletion of snapshot newer than the oldest
|
||||
// snapshot is not crashing the thread writing to under construction file.
|
||||
SnapshotTestHelper.createSnapshot(fs, snapRootDir, snap2Name);
|
||||
SnapshotTestHelper.createSnapshot(fs, snapRootDir, snap3Name);
|
||||
fs.deleteSnapshot(snapRootDir, snap3Name);
|
||||
fs.deleteSnapshot(snapRootDir, snap2Name);
|
||||
SnapshotTestHelper.createSnapshot(fs, snapRootDir, "test");
|
||||
|
||||
t.join();
|
||||
Assert.assertFalse("Client encountered writing error!", writerError.get());
|
||||
|
||||
restartNameNode();
|
||||
cluster.waitActive();
|
||||
}
|
||||
|
||||
private void restartNameNode() throws Exception {
|
||||
cluster.triggerBlockReports();
|
||||
NameNode nameNode = cluster.getNameNode();
|
||||
|
|
Loading…
Reference in New Issue