HDFS-8228. Erasure Coding: SequentialBlockGroupIdGenerator#nextValue may cause block id conflicts. Contributed by Jing Zhao.
This commit is contained in:
parent
3f2c6938f1
commit
f5d4a95ef5
|
@ -128,3 +128,6 @@
|
||||||
|
|
||||||
HDFS-8223. Should calculate checksum for parity blocks in DFSStripedOutputStream.
|
HDFS-8223. Should calculate checksum for parity blocks in DFSStripedOutputStream.
|
||||||
(Yi Liu via jing9)
|
(Yi Liu via jing9)
|
||||||
|
|
||||||
|
HDFS-8228. Erasure Coding: SequentialBlockGroupIdGenerator#nextValue may cause
|
||||||
|
block id conflicts (Jing Zhao via Zhe Zhang)
|
||||||
|
|
|
@ -19,9 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.util.SequentialNumber;
|
import org.apache.hadoop.util.SequentialNumber;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_GROUP_INDEX_MASK;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_BLOCKS_IN_GROUP;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate the next valid block group ID by incrementing the maximum block
|
* Generate the next valid block group ID by incrementing the maximum block
|
||||||
* group ID allocated so far, with the first 2^10 block group IDs reserved.
|
* group ID allocated so far, with the first 2^10 block group IDs reserved.
|
||||||
|
@ -34,6 +36,9 @@ import org.apache.hadoop.util.SequentialNumber;
|
||||||
* bits (n+2) to (64-m) represent the ID of its block group, while the last m
|
* bits (n+2) to (64-m) represent the ID of its block group, while the last m
|
||||||
* bits represent its index of the group. The value m is determined by the
|
* bits represent its index of the group. The value m is determined by the
|
||||||
* maximum number of blocks in a group (MAX_BLOCKS_IN_GROUP).
|
* maximum number of blocks in a group (MAX_BLOCKS_IN_GROUP).
|
||||||
|
*
|
||||||
|
* Note that the {@link #nextValue()} methods requires external lock to
|
||||||
|
* guarantee IDs have no conflicts.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SequentialBlockGroupIdGenerator extends SequentialNumber {
|
public class SequentialBlockGroupIdGenerator extends SequentialNumber {
|
||||||
|
@ -47,32 +52,30 @@ public class SequentialBlockGroupIdGenerator extends SequentialNumber {
|
||||||
|
|
||||||
@Override // NumberGenerator
|
@Override // NumberGenerator
|
||||||
public long nextValue() {
|
public long nextValue() {
|
||||||
// Skip to next legitimate block group ID based on the naming protocol
|
skipTo((getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK) + MAX_BLOCKS_IN_GROUP);
|
||||||
while (super.getCurrentValue() % HdfsConstants.MAX_BLOCKS_IN_GROUP > 0) {
|
|
||||||
super.nextValue();
|
|
||||||
}
|
|
||||||
// Make sure there's no conflict with existing random block IDs
|
// Make sure there's no conflict with existing random block IDs
|
||||||
while (hasValidBlockInRange(super.getCurrentValue())) {
|
final Block b = new Block(getCurrentValue());
|
||||||
super.skipTo(super.getCurrentValue() +
|
while (hasValidBlockInRange(b)) {
|
||||||
HdfsConstants.MAX_BLOCKS_IN_GROUP);
|
skipTo(getCurrentValue() + MAX_BLOCKS_IN_GROUP);
|
||||||
|
b.setBlockId(getCurrentValue());
|
||||||
}
|
}
|
||||||
if (super.getCurrentValue() >= 0) {
|
if (b.getBlockId() >= 0) {
|
||||||
BlockManager.LOG.warn("All negative block group IDs are used, " +
|
throw new IllegalStateException("All negative block group IDs are used, "
|
||||||
"growing into positive IDs, " +
|
+ "growing into positive IDs, "
|
||||||
"which might conflict with non-erasure coded blocks.");
|
+ "which might conflict with non-erasure coded blocks.");
|
||||||
}
|
}
|
||||||
return super.getCurrentValue();
|
return getCurrentValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
* @param b A block object whose id is set to the starting point for check
|
||||||
* @param id The starting ID of the range
|
|
||||||
* @return true if any ID in the range
|
* @return true if any ID in the range
|
||||||
* {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a file
|
* {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a file
|
||||||
*/
|
*/
|
||||||
private boolean hasValidBlockInRange(long id) {
|
private boolean hasValidBlockInRange(Block b) {
|
||||||
for (int i = 0; i < HdfsConstants.MAX_BLOCKS_IN_GROUP; i++) {
|
final long id = b.getBlockId();
|
||||||
Block b = new Block(id + i);
|
for (int i = 0; i < MAX_BLOCKS_IN_GROUP; i++) {
|
||||||
|
b.setBlockId(id + i);
|
||||||
if (blockManager.getBlockCollection(b) != null) {
|
if (blockManager.getBlockCollection(b) != null) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class SequentialBlockIdGenerator extends SequentialNumber {
|
||||||
b.setBlockId(super.nextValue());
|
b.setBlockId(super.nextValue());
|
||||||
}
|
}
|
||||||
if (b.getBlockId() < 0) {
|
if (b.getBlockId() < 0) {
|
||||||
BlockManager.LOG.warn("All positive block IDs are used, " +
|
throw new IllegalStateException("All positive block IDs are used, " +
|
||||||
"wrapping to negative IDs, " +
|
"wrapping to negative IDs, " +
|
||||||
"which might conflict with erasure coded block groups.");
|
"which might conflict with erasure coded block groups.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,10 +22,8 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -40,18 +38,15 @@ public class TestDFSStripedInputStream {
|
||||||
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
||||||
private final static int stripesPerBlock = 4;
|
private final static int stripesPerBlock = 4;
|
||||||
static int blockSize = cellSize * stripesPerBlock;
|
static int blockSize = cellSize * stripesPerBlock;
|
||||||
private int mod = 29;
|
|
||||||
static int numDNs = dataBlocks + parityBlocks + 2;
|
static int numDNs = dataBlocks + parityBlocks + 2;
|
||||||
|
|
||||||
private static MiniDFSCluster cluster;
|
private static MiniDFSCluster cluster;
|
||||||
private static Configuration conf;
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setup() throws IOException {
|
public static void setup() throws IOException {
|
||||||
conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
cluster
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
||||||
= new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();;
|
|
||||||
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
|
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
}
|
}
|
||||||
|
@ -85,43 +80,56 @@ public class TestDFSStripedInputStream {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileSmallerThanOneStripe1() throws IOException {
|
public void testFileSmallerThanOneStripe1() throws IOException {
|
||||||
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
|
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
|
||||||
|
cellSize * dataBlocks - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileSmallerThanOneStripe2() throws IOException {
|
public void testFileSmallerThanOneStripe2() throws IOException {
|
||||||
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize + 123);
|
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
|
||||||
|
cellSize + 123);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileEqualsWithOneStripe() throws IOException {
|
public void testFileEqualsWithOneStripe() throws IOException {
|
||||||
testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", cellSize * dataBlocks);
|
testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe",
|
||||||
|
cellSize * dataBlocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileMoreThanOneStripe1() throws IOException {
|
public void testFileMoreThanOneStripe1() throws IOException {
|
||||||
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
|
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1",
|
||||||
|
cellSize * dataBlocks + 123);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileMoreThanOneStripe2() throws IOException {
|
public void testFileMoreThanOneStripe2() throws IOException {
|
||||||
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", cellSize * dataBlocks
|
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2",
|
||||||
+ cellSize * dataBlocks + 123);
|
cellSize * dataBlocks + cellSize * dataBlocks + 123);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLessThanFullBlockGroup() throws IOException {
|
||||||
|
testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup",
|
||||||
|
cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileFullBlockGroup() throws IOException {
|
public void testFileFullBlockGroup() throws IOException {
|
||||||
testOneFileUsingDFSStripedInputStream("/FullBlockGroup", blockSize * dataBlocks);
|
testOneFileUsingDFSStripedInputStream("/FullBlockGroup",
|
||||||
|
blockSize * dataBlocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileMoreThanABlockGroup1() throws IOException {
|
public void testFileMoreThanABlockGroup1() throws IOException {
|
||||||
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
|
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1",
|
||||||
|
blockSize * dataBlocks + 123);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileMoreThanABlockGroup2() throws IOException {
|
public void testFileMoreThanABlockGroup2() throws IOException {
|
||||||
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123);
|
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
|
||||||
|
blockSize * dataBlocks + cellSize+ 123);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -141,35 +149,32 @@ public class TestDFSStripedInputStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte getByte(long pos) {
|
private byte getByte(long pos) {
|
||||||
|
final int mod = 29;
|
||||||
return (byte) (pos % mod + 1);
|
return (byte) (pos % mod + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
|
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path TestPath = new Path(src);
|
Path testPath = new Path(src);
|
||||||
byte[] bytes = generateBytes(writeBytes);
|
byte[] bytes = generateBytes(writeBytes);
|
||||||
DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
|
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
|
||||||
|
|
||||||
//check file length
|
//check file length
|
||||||
FileStatus status = fs.getFileStatus(TestPath);
|
FileStatus status = fs.getFileStatus(testPath);
|
||||||
long fileLength = status.getLen();
|
long fileLength = status.getLen();
|
||||||
Assert.assertEquals("File length should be the same",
|
Assert.assertEquals("File length should be the same",
|
||||||
writeBytes, fileLength);
|
writeBytes, fileLength);
|
||||||
|
|
||||||
DFSStripedInputStream dis = new DFSStripedInputStream(
|
try (DFSStripedInputStream dis =
|
||||||
fs.getClient(), src, true);
|
new DFSStripedInputStream(fs.getClient(), src, true)) {
|
||||||
try {
|
|
||||||
byte[] buf = new byte[writeBytes + 100];
|
byte[] buf = new byte[writeBytes + 100];
|
||||||
int readLen = dis.read(0, buf, 0, buf.length);
|
int readLen = dis.read(0, buf, 0, buf.length);
|
||||||
readLen = readLen >= 0 ? readLen : 0;
|
readLen = readLen >= 0 ? readLen : 0;
|
||||||
Assert.assertEquals("The length of file should be the same to write size",
|
Assert.assertEquals("The length of file should be the same to write size",
|
||||||
writeBytes, readLen);
|
writeBytes, readLen);
|
||||||
for (int i = 0; i < writeBytes; i++) {
|
for (int i = 0; i < writeBytes; i++) {
|
||||||
Assert.assertEquals("Byte at i should be the same",
|
Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
|
||||||
getByte(i), buf[i]);
|
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
dis.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,6 +82,27 @@ public class TestAddStripedBlocks {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure the IDs of striped blocks do not conflict
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAllocateBlockId() throws Exception {
|
||||||
|
Path testPath = new Path("/testfile");
|
||||||
|
// create a file while allocates a new block
|
||||||
|
DFSTestUtil.writeFile(dfs, testPath, "hello, world!");
|
||||||
|
LocatedBlocks lb = dfs.getClient().getLocatedBlocks(testPath.toString(), 0);
|
||||||
|
final long firstId = lb.get(0).getBlock().getBlockId();
|
||||||
|
// delete the file
|
||||||
|
dfs.delete(testPath, true);
|
||||||
|
|
||||||
|
// allocate a new block, and make sure the new block's id does not conflict
|
||||||
|
// with the previous one
|
||||||
|
DFSTestUtil.writeFile(dfs, testPath, "hello again");
|
||||||
|
lb = dfs.getClient().getLocatedBlocks(testPath.toString(), 0);
|
||||||
|
final long secondId = lb.get(0).getBlock().getBlockId();
|
||||||
|
Assert.assertEquals(firstId + HdfsConstants.MAX_BLOCKS_IN_GROUP, secondId);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddStripedBlock() throws Exception {
|
public void testAddStripedBlock() throws Exception {
|
||||||
final Path file = new Path("/file1");
|
final Path file = new Path("/file1");
|
||||||
|
|
Loading…
Reference in New Issue