HDFS-8438. Erasure Coding: Allow concat striped files if they have the same ErasureCodingPolicy. Contributed by Walter Su.

This commit is contained in:
Jing Zhao 2015-10-13 11:03:37 -07:00
parent 84cbd72afd
commit da16c9b3b4
3 changed files with 95 additions and 34 deletions

View File

@ -832,6 +832,9 @@ Trunk (Unreleased)
HDFS-9209. Erasure coding: Add apache license header in HDFS-9209. Erasure coding: Add apache license header in
TestFileStatusWithECPolicy.java. (Surendra Singh Lilhore via jing9) TestFileStatusWithECPolicy.java. (Surendra Singh Lilhore via jing9)
HDFS-8438. Erasure Coding: Allow concat striped files if they have the same
ErasureCodingPolicy. (Walter Su via jing9)
Release 2.8.0 - UNRELEASED Release 2.8.0 - UNRELEASED
NEW FEATURES NEW FEATURES

View File

@ -154,10 +154,11 @@ class FSDirConcatOp {
+ " which is greater than the target file's preferred block size " + " which is greater than the target file's preferred block size "
+ targetINode.getPreferredBlockSize()); + targetINode.getPreferredBlockSize());
} }
// TODO currently we do not support concatenating EC files if(srcINodeFile.getErasureCodingPolicyID() !=
if (srcINodeFile.isStriped()) { targetINode.getErasureCodingPolicyID()) {
throw new HadoopIllegalArgumentException("concat: the src file " + src throw new HadoopIllegalArgumentException("Source file " + src
+ " is with striped blocks"); + " and target file " + targetIIP.getPath()
+ " have different erasure coding policy");
} }
si.add(srcINodeFile); si.add(srcINodeFile);
} }

View File

@ -28,14 +28,18 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
@ -61,8 +65,9 @@ public class TestWriteReadStripedFile {
public void setup() throws IOException { public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
fs.mkdirs(new Path("/ec"));
cluster.getFileSystem().getClient().setErasureCodingPolicy("/ec", null);
} }
@After @After
@ -74,108 +79,110 @@ public class TestWriteReadStripedFile {
@Test @Test
public void testFileEmpty() throws Exception { public void testFileEmpty() throws Exception {
testOneFileUsingDFSStripedInputStream("/EmptyFile", 0); testOneFileUsingDFSStripedInputStream("/ec/EmptyFile", 0);
testOneFileUsingDFSStripedInputStream("/EmptyFile2", 0, true); testOneFileUsingDFSStripedInputStream("/ec/EmptyFile2", 0, true);
} }
@Test @Test
public void testFileSmallerThanOneCell1() throws Exception { public void testFileSmallerThanOneCell1() throws Exception {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1); testOneFileUsingDFSStripedInputStream("/ec/SmallerThanOneCell", 1);
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", 1, true); testOneFileUsingDFSStripedInputStream("/ec/SmallerThanOneCell2", 1, true);
} }
@Test @Test
public void testFileSmallerThanOneCell2() throws Exception { public void testFileSmallerThanOneCell2() throws Exception {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1); testOneFileUsingDFSStripedInputStream("/ec/SmallerThanOneCell",
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", cellSize - 1, cellSize - 1);
true); testOneFileUsingDFSStripedInputStream("/ec/SmallerThanOneCell2",
cellSize - 1, true);
} }
@Test @Test
public void testFileEqualsWithOneCell() throws Exception { public void testFileEqualsWithOneCell() throws Exception {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize); testOneFileUsingDFSStripedInputStream("/ec/EqualsWithOneCell", cellSize);
testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell2", cellSize, true); testOneFileUsingDFSStripedInputStream("/ec/EqualsWithOneCell2",
cellSize, true);
} }
@Test @Test
public void testFileSmallerThanOneStripe1() throws Exception { public void testFileSmallerThanOneStripe1() throws Exception {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", testOneFileUsingDFSStripedInputStream("/ec/SmallerThanOneStripe",
cellSize * dataBlocks - 1); cellSize * dataBlocks - 1);
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2", testOneFileUsingDFSStripedInputStream("/ec/SmallerThanOneStripe2",
cellSize * dataBlocks - 1, true); cellSize * dataBlocks - 1, true);
} }
@Test @Test
public void testFileSmallerThanOneStripe2() throws Exception { public void testFileSmallerThanOneStripe2() throws Exception {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", testOneFileUsingDFSStripedInputStream("/ec/SmallerThanOneStripe",
cellSize + 123); cellSize + 123);
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2", testOneFileUsingDFSStripedInputStream("/ec/SmallerThanOneStripe2",
cellSize + 123, true); cellSize + 123, true);
} }
@Test @Test
public void testFileEqualsWithOneStripe() throws Exception { public void testFileEqualsWithOneStripe() throws Exception {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", testOneFileUsingDFSStripedInputStream("/ec/EqualsWithOneStripe",
cellSize * dataBlocks); cellSize * dataBlocks);
testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe2", testOneFileUsingDFSStripedInputStream("/ec/EqualsWithOneStripe2",
cellSize * dataBlocks, true); cellSize * dataBlocks, true);
} }
@Test @Test
public void testFileMoreThanOneStripe1() throws Exception { public void testFileMoreThanOneStripe1() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", testOneFileUsingDFSStripedInputStream("/ec/MoreThanOneStripe1",
cellSize * dataBlocks + 123); cellSize * dataBlocks + 123);
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe12", testOneFileUsingDFSStripedInputStream("/ec/MoreThanOneStripe12",
cellSize * dataBlocks + 123, true); cellSize * dataBlocks + 123, true);
} }
@Test @Test
public void testFileMoreThanOneStripe2() throws Exception { public void testFileMoreThanOneStripe2() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", testOneFileUsingDFSStripedInputStream("/ec/MoreThanOneStripe2",
cellSize * dataBlocks + cellSize * dataBlocks + 123); cellSize * dataBlocks + cellSize * dataBlocks + 123);
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe22", testOneFileUsingDFSStripedInputStream("/ec/MoreThanOneStripe22",
cellSize * dataBlocks + cellSize * dataBlocks + 123, true); cellSize * dataBlocks + cellSize * dataBlocks + 123, true);
} }
@Test @Test
public void testLessThanFullBlockGroup() throws Exception { public void testLessThanFullBlockGroup() throws Exception {
testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup", testOneFileUsingDFSStripedInputStream("/ec/LessThanFullBlockGroup",
cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize); cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup2", testOneFileUsingDFSStripedInputStream("/ec/LessThanFullBlockGroup2",
cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize, true); cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize, true);
} }
@Test @Test
public void testFileFullBlockGroup() throws Exception { public void testFileFullBlockGroup() throws Exception {
testOneFileUsingDFSStripedInputStream("/FullBlockGroup", testOneFileUsingDFSStripedInputStream("/ec/FullBlockGroup",
blockSize * dataBlocks); blockSize * dataBlocks);
testOneFileUsingDFSStripedInputStream("/FullBlockGroup2", testOneFileUsingDFSStripedInputStream("/ec/FullBlockGroup2",
blockSize * dataBlocks, true); blockSize * dataBlocks, true);
} }
@Test @Test
public void testFileMoreThanABlockGroup1() throws Exception { public void testFileMoreThanABlockGroup1() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", testOneFileUsingDFSStripedInputStream("/ec/MoreThanABlockGroup1",
blockSize * dataBlocks + 123); blockSize * dataBlocks + 123);
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup12", testOneFileUsingDFSStripedInputStream("/ec/MoreThanABlockGroup12",
blockSize * dataBlocks + 123, true); blockSize * dataBlocks + 123, true);
} }
@Test @Test
public void testFileMoreThanABlockGroup2() throws Exception { public void testFileMoreThanABlockGroup2() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", testOneFileUsingDFSStripedInputStream("/ec/MoreThanABlockGroup2",
blockSize * dataBlocks + cellSize + 123); blockSize * dataBlocks + cellSize + 123);
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup22", testOneFileUsingDFSStripedInputStream("/ec/MoreThanABlockGroup22",
blockSize * dataBlocks + cellSize + 123, true); blockSize * dataBlocks + cellSize + 123, true);
} }
@Test @Test
public void testFileMoreThanABlockGroup3() throws Exception { public void testFileMoreThanABlockGroup3() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3", testOneFileUsingDFSStripedInputStream("/ec/MoreThanABlockGroup3",
blockSize * dataBlocks * 3 + cellSize * dataBlocks blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ cellSize + 123); + cellSize + 123);
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup32", testOneFileUsingDFSStripedInputStream("/ec/MoreThanABlockGroup32",
blockSize * dataBlocks * 3 + cellSize * dataBlocks blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ cellSize + 123, true); + cellSize + 123, true);
} }
@ -252,4 +259,54 @@ public class TestWriteReadStripedFile {
StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
// webhdfs doesn't support bytebuffer read // webhdfs doesn't support bytebuffer read
} }
@Test
public void testConcat() throws Exception {
final byte[] data =
StripedFileTestUtil.generateBytes(blockSize * dataBlocks * 10 + 234);
int totalLength = 0;
Random r = new Random();
Path target = new Path("/ec/testConcat_target");
DFSTestUtil.writeFile(fs, target, Arrays.copyOfRange(data, 0, 123));
totalLength += 123;
int numFiles = 5;
Path[] srcs = new Path[numFiles];
for (int i = 0; i < numFiles; i++) {
srcs[i] = new Path("/ec/testConcat_src_file_" + i);
int srcLength = r.nextInt(blockSize * dataBlocks * 2) + 1;
DFSTestUtil.writeFile(fs, srcs[i],
Arrays.copyOfRange(data, totalLength, totalLength + srcLength));
totalLength += srcLength;
}
fs.concat(target, srcs);
StripedFileTestUtil.verifyStatefulRead(fs, target, totalLength,
Arrays.copyOfRange(data, 0, totalLength), new byte[1024]);
}
@Test
public void testConcatWithDifferentECPolicy() throws Exception {
final byte[] data =
StripedFileTestUtil.generateBytes(blockSize * dataBlocks);
Path nonECFile = new Path("/non_ec_file");
DFSTestUtil.writeFile(fs, nonECFile, data);
Path target = new Path("/ec/non_ec_file");
fs.rename(nonECFile, target);
int numFiles = 2;
Path[] srcs = new Path[numFiles];
for (int i = 0; i < numFiles; i++) {
srcs[i] = new Path("/ec/testConcat_src_file_"+i);
DFSTestUtil.writeFile(fs, srcs[i], data);
}
try {
fs.concat(target, srcs);
Assert.fail("non-ec file shouldn't concat with ec file");
} catch (RemoteException e){
Assert.assertTrue(e.getMessage()
.contains("have different erasure coding policy"));
}
}
} }