HDFS-14373. EC : Decoding is failing when block group last incomplete cell fall in to AlignedStripe. Contributed by Surendra Singh Lilhore.
(cherry picked from commit 382967be51
)
This commit is contained in:
parent
5704f15589
commit
a0cdebc409
|
@ -248,6 +248,8 @@ abstract class StripeReader {
|
||||||
DFSClient.LOG.warn("Found Checksum error for "
|
DFSClient.LOG.warn("Found Checksum error for "
|
||||||
+ currentBlock + " from " + currentNode
|
+ currentBlock + " from " + currentNode
|
||||||
+ " at " + ce.getPos());
|
+ " at " + ce.getPos());
|
||||||
|
//Clear buffer to make next decode success
|
||||||
|
strategy.getReadBuffer().clear();
|
||||||
// we want to remember which block replicas we have tried
|
// we want to remember which block replicas we have tried
|
||||||
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
|
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
|
||||||
throw ce;
|
throw ce;
|
||||||
|
@ -255,6 +257,8 @@ abstract class StripeReader {
|
||||||
DFSClient.LOG.warn("Exception while reading from "
|
DFSClient.LOG.warn("Exception while reading from "
|
||||||
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
|
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
|
||||||
+ currentNode, e);
|
+ currentNode, e);
|
||||||
|
//Clear buffer to make next decode success
|
||||||
|
strategy.getReadBuffer().clear();
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -356,7 +356,8 @@ public class StripedBlockUtil {
|
||||||
cells);
|
cells);
|
||||||
|
|
||||||
// Step 3: merge into stripes
|
// Step 3: merge into stripes
|
||||||
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
|
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
|
||||||
|
blockGroup, cellSize);
|
||||||
|
|
||||||
// Step 4: calculate each chunk's position in destination buffer. Since the
|
// Step 4: calculate each chunk's position in destination buffer. Since the
|
||||||
// whole read range is within a single stripe, the logic is simpler here.
|
// whole read range is within a single stripe, the logic is simpler here.
|
||||||
|
@ -417,7 +418,8 @@ public class StripedBlockUtil {
|
||||||
cells);
|
cells);
|
||||||
|
|
||||||
// Step 3: merge into at most 5 stripes
|
// Step 3: merge into at most 5 stripes
|
||||||
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
|
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
|
||||||
|
blockGroup, cellSize);
|
||||||
|
|
||||||
// Step 4: calculate each chunk's position in destination buffer
|
// Step 4: calculate each chunk's position in destination buffer
|
||||||
calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
|
calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
|
||||||
|
@ -513,7 +515,8 @@ public class StripedBlockUtil {
|
||||||
* {@link AlignedStripe} instances.
|
* {@link AlignedStripe} instances.
|
||||||
*/
|
*/
|
||||||
private static AlignedStripe[] mergeRangesForInternalBlocks(
|
private static AlignedStripe[] mergeRangesForInternalBlocks(
|
||||||
ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) {
|
ErasureCodingPolicy ecPolicy, VerticalRange[] ranges,
|
||||||
|
LocatedStripedBlock blockGroup, int cellSize) {
|
||||||
int dataBlkNum = ecPolicy.getNumDataUnits();
|
int dataBlkNum = ecPolicy.getNumDataUnits();
|
||||||
int parityBlkNum = ecPolicy.getNumParityUnits();
|
int parityBlkNum = ecPolicy.getNumParityUnits();
|
||||||
List<AlignedStripe> stripes = new ArrayList<>();
|
List<AlignedStripe> stripes = new ArrayList<>();
|
||||||
|
@ -525,6 +528,17 @@ public class StripedBlockUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add block group last cell offset in stripePoints if it is fall in to read
|
||||||
|
// offset range.
|
||||||
|
int lastCellIdxInBG = (int) (blockGroup.getBlockSize() / cellSize);
|
||||||
|
int idxInInternalBlk = lastCellIdxInBG / ecPolicy.getNumDataUnits();
|
||||||
|
long lastCellEndOffset = (idxInInternalBlk * (long)cellSize)
|
||||||
|
+ (blockGroup.getBlockSize() % cellSize);
|
||||||
|
if (stripePoints.first() < lastCellEndOffset
|
||||||
|
&& stripePoints.last() > lastCellEndOffset) {
|
||||||
|
stripePoints.add(lastCellEndOffset);
|
||||||
|
}
|
||||||
|
|
||||||
long prev = -1;
|
long prev = -1;
|
||||||
for (long point : stripePoints) {
|
for (long point : stripePoints) {
|
||||||
if (prev >= 0) {
|
if (prev >= 0) {
|
||||||
|
|
|
@ -19,8 +19,11 @@ package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -561,6 +564,50 @@ public class TestDFSStripedInputStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadWhenLastIncompleteCellComeInToDecodeAlignedStripe()
|
||||||
|
throws IOException {
|
||||||
|
DataNodeProperties stopDataNode = null;
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
ErasureCodingPolicy policy = getEcPolicy();
|
||||||
|
DistributedFileSystem filesystem = cluster.getFileSystem();
|
||||||
|
filesystem.enableErasureCodingPolicy(policy.getName());
|
||||||
|
Path dir = new Path("/tmp");
|
||||||
|
filesystem.mkdirs(dir);
|
||||||
|
filesystem.getClient().setErasureCodingPolicy(dir.toString(),
|
||||||
|
policy.getName());
|
||||||
|
Path f = new Path(dir, "file");
|
||||||
|
|
||||||
|
//1. File with one stripe, last data cell should be half filed.
|
||||||
|
long fileLength = (policy.getCellSize() * policy.getNumDataUnits())
|
||||||
|
- (policy.getCellSize() / 2);
|
||||||
|
DFSTestUtil.createFile(filesystem, f, fileLength, (short) 1, 0);
|
||||||
|
|
||||||
|
//2. Stop first DN from stripe.
|
||||||
|
LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
|
||||||
|
f.toString(), 0, fileLength);
|
||||||
|
LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
|
||||||
|
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(bg,
|
||||||
|
cellSize, dataBlocks, parityBlocks);
|
||||||
|
cluster.stopDataNode(blocks[0].getLocations()[0].getName());
|
||||||
|
|
||||||
|
//3. Do pread for fist cell, reconstruction should happen
|
||||||
|
try (FSDataInputStream in = filesystem.open(f)) {
|
||||||
|
DFSStripedInputStream stripedIn = (DFSStripedInputStream) in
|
||||||
|
.getWrappedStream();
|
||||||
|
byte[] b = new byte[policy.getCellSize()];
|
||||||
|
stripedIn.read(0, b, 0, policy.getCellSize());
|
||||||
|
}
|
||||||
|
} catch (HadoopIllegalArgumentException e) {
|
||||||
|
fail(e.getMessage());
|
||||||
|
} finally {
|
||||||
|
if (stopDataNode != null) {
|
||||||
|
cluster.restartDataNode(stopDataNode, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Empties the pool for the specified buffer type, for the current ecPolicy.
|
* Empties the pool for the specified buffer type, for the current ecPolicy.
|
||||||
* <p>
|
* <p>
|
||||||
|
|
Loading…
Reference in New Issue