HDFS-11882. Precisely calculate acked length of striped block groups in updatePipeline.
This commit is contained in:
parent
0ba8ff4b77
commit
ccd2ac60ec
|
@ -641,7 +641,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
// wait till all the healthy streamers to
|
// wait till all the healthy streamers to
|
||||||
// 1) get the updated block info
|
// 1) get the updated block info
|
||||||
// 2) create new block outputstream
|
// 2) create new block outputstream
|
||||||
newFailed = waitCreatingNewStreams(healthySet);
|
newFailed = waitCreatingStreamers(healthySet);
|
||||||
if (newFailed.size() + failedStreamers.size() >
|
if (newFailed.size() + failedStreamers.size() >
|
||||||
numAllBlocks - numDataBlocks) {
|
numAllBlocks - numDataBlocks) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
|
@ -668,6 +668,14 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the streamers were successfully updated, adding failed streamers
|
||||||
|
* in the <i>failed</i> return parameter.
|
||||||
|
* @param failed Return parameter containing failed streamers from
|
||||||
|
* <i>streamers</i>.
|
||||||
|
* @param streamers Set of streamers that are being updated
|
||||||
|
* @return total number of successful updates and failures
|
||||||
|
*/
|
||||||
private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
|
private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
|
||||||
Set<StripedDataStreamer> streamers) {
|
Set<StripedDataStreamer> streamers) {
|
||||||
for (StripedDataStreamer streamer : streamers) {
|
for (StripedDataStreamer streamer : streamers) {
|
||||||
|
@ -682,7 +690,15 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
return coordinator.updateStreamerMap.size() + failed.size();
|
return coordinator.updateStreamerMap.size() + failed.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<StripedDataStreamer> waitCreatingNewStreams(
|
/**
|
||||||
|
* Waits for streamers to be created.
|
||||||
|
*
|
||||||
|
* @param healthyStreamers Set of healthy streamers
|
||||||
|
* @return Set of streamers that failed.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private Set<StripedDataStreamer> waitCreatingStreamers(
|
||||||
Set<StripedDataStreamer> healthyStreamers) throws IOException {
|
Set<StripedDataStreamer> healthyStreamers) throws IOException {
|
||||||
Set<StripedDataStreamer> failed = new HashSet<>();
|
Set<StripedDataStreamer> failed = new HashSet<>();
|
||||||
final int expectedNum = healthyStreamers.size();
|
final int expectedNum = healthyStreamers.size();
|
||||||
|
@ -773,9 +789,10 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// should update the block group length based on the acked length
|
// Update the NameNode with the acked length of the block group
|
||||||
|
// Save and restore the unacked length
|
||||||
final long sentBytes = currentBlockGroup.getNumBytes();
|
final long sentBytes = currentBlockGroup.getNumBytes();
|
||||||
final long ackedBytes = getNumAckedStripes() * cellSize * numDataBlocks;
|
final long ackedBytes = getAckedLength();
|
||||||
Preconditions.checkState(ackedBytes <= sentBytes,
|
Preconditions.checkState(ackedBytes <= sentBytes,
|
||||||
"Acked:" + ackedBytes + ", Sent:" + sentBytes);
|
"Acked:" + ackedBytes + ", Sent:" + sentBytes);
|
||||||
currentBlockGroup.setNumBytes(ackedBytes);
|
currentBlockGroup.setNumBytes(ackedBytes);
|
||||||
|
@ -787,23 +804,140 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the number of acked stripes. An acked stripe means at least data block
|
* Return the length of each block in the block group.
|
||||||
* number size cells of the stripe were acked.
|
* Unhealthy blocks have a length of -1.
|
||||||
|
*
|
||||||
|
* @return List of block lengths.
|
||||||
*/
|
*/
|
||||||
private long getNumAckedStripes() {
|
private List<Long> getBlockLengths() {
|
||||||
int minStripeNum = Integer.MAX_VALUE;
|
List<Long> blockLengths = new ArrayList<>(numAllBlocks);
|
||||||
for (int i = 0; i < numAllBlocks; i++) {
|
for (int i = 0; i < numAllBlocks; i++) {
|
||||||
final StripedDataStreamer streamer = getStripedDataStreamer(i);
|
final StripedDataStreamer streamer = getStripedDataStreamer(i);
|
||||||
|
long numBytes = -1;
|
||||||
if (streamer.isHealthy()) {
|
if (streamer.isHealthy()) {
|
||||||
int curStripeNum = 0;
|
|
||||||
if (streamer.getBlock() != null) {
|
if (streamer.getBlock() != null) {
|
||||||
curStripeNum = (int) (streamer.getBlock().getNumBytes() / cellSize);
|
numBytes = streamer.getBlock().getNumBytes();
|
||||||
}
|
|
||||||
minStripeNum = Math.min(curStripeNum, minStripeNum);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert minStripeNum != Integer.MAX_VALUE;
|
blockLengths.add(numBytes);
|
||||||
return minStripeNum;
|
}
|
||||||
|
return blockLengths;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the length of acked bytes in the block group.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* A full stripe is acked when at least numDataBlocks streamers have
|
||||||
|
* the corresponding cells of the stripe, and all previous full stripes are
|
||||||
|
* also acked. This enforces the constraint that there is at most one
|
||||||
|
* partial stripe.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Partial stripes write all parity cells. Empty data cells are not written.
|
||||||
|
* Parity cells are the length of the longest data cell(s). For example,
|
||||||
|
* with RS(3,2), if we have data cells with lengths [1MB, 64KB, 0], the
|
||||||
|
* parity blocks will be length [1MB, 1MB].
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* To be considered acked, a partial stripe needs at least numDataBlocks
|
||||||
|
* empty or written cells.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Currently, partial stripes can only happen when closing the file at a
|
||||||
|
* non-stripe boundary, but this could also happen during (currently
|
||||||
|
* unimplemented) hflush/hsync support.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
private long getAckedLength() {
|
||||||
|
// Determine the number of full stripes that are sufficiently durable
|
||||||
|
final long sentBytes = currentBlockGroup.getNumBytes();
|
||||||
|
final long numFullStripes = sentBytes / numDataBlocks / cellSize;
|
||||||
|
final long fullStripeLength = numFullStripes * numDataBlocks * cellSize;
|
||||||
|
assert fullStripeLength <= sentBytes : "Full stripe length can't be " +
|
||||||
|
"greater than the block group length";
|
||||||
|
|
||||||
|
long ackedLength = 0;
|
||||||
|
|
||||||
|
// Determine the length contained by at least `numDataBlocks` blocks.
|
||||||
|
// Since it's sorted, all the blocks after `offset` are at least as long,
|
||||||
|
// and there are at least `numDataBlocks` at or after `offset`.
|
||||||
|
List<Long> blockLengths = Collections.unmodifiableList(getBlockLengths());
|
||||||
|
List<Long> sortedBlockLengths = new ArrayList<>(blockLengths);
|
||||||
|
Collections.sort(sortedBlockLengths);
|
||||||
|
if (numFullStripes > 0) {
|
||||||
|
final int offset = sortedBlockLengths.size() - numDataBlocks;
|
||||||
|
ackedLength = sortedBlockLengths.get(offset) * numDataBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the acked length is less than the expected full stripe length, then
|
||||||
|
// we're missing a full stripe. Return the acked length.
|
||||||
|
if (ackedLength < fullStripeLength) {
|
||||||
|
return ackedLength;
|
||||||
|
}
|
||||||
|
// If the expected length is exactly a stripe boundary, then we're also done
|
||||||
|
if (ackedLength == sentBytes) {
|
||||||
|
return ackedLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Otherwise, we're potentially dealing with a partial stripe.
|
||||||
|
The partial stripe is laid out as follows:
|
||||||
|
|
||||||
|
0 or more full data cells, `cellSize` in length.
|
||||||
|
0 or 1 partial data cells.
|
||||||
|
0 or more empty data cells.
|
||||||
|
`numParityBlocks` parity cells, the length of the longest data cell.
|
||||||
|
|
||||||
|
If the partial stripe is sufficiently acked, we'll update the ackedLength.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// How many full and empty data cells do we expect?
|
||||||
|
final int numFullDataCells = (int)
|
||||||
|
((sentBytes - fullStripeLength) / cellSize);
|
||||||
|
final int partialLength = (int) (sentBytes - fullStripeLength) % cellSize;
|
||||||
|
final int numPartialDataCells = partialLength == 0 ? 0 : 1;
|
||||||
|
final int numEmptyDataCells = numDataBlocks - numFullDataCells -
|
||||||
|
numPartialDataCells;
|
||||||
|
// Calculate the expected length of the parity blocks.
|
||||||
|
final int parityLength = numFullDataCells > 0 ? cellSize : partialLength;
|
||||||
|
|
||||||
|
final long fullStripeBlockOffset = fullStripeLength / numDataBlocks;
|
||||||
|
|
||||||
|
// Iterate through each type of streamers, checking the expected length.
|
||||||
|
long[] expectedBlockLengths = new long[numAllBlocks];
|
||||||
|
int idx = 0;
|
||||||
|
// Full cells
|
||||||
|
for (; idx < numFullDataCells; idx++) {
|
||||||
|
expectedBlockLengths[idx] = fullStripeBlockOffset + cellSize;
|
||||||
|
}
|
||||||
|
// Partial cell
|
||||||
|
for (; idx < numFullDataCells + numPartialDataCells; idx++) {
|
||||||
|
expectedBlockLengths[idx] = fullStripeBlockOffset + partialLength;
|
||||||
|
}
|
||||||
|
// Empty cells
|
||||||
|
for (; idx < numFullDataCells + numPartialDataCells + numEmptyDataCells;
|
||||||
|
idx++) {
|
||||||
|
expectedBlockLengths[idx] = fullStripeBlockOffset;
|
||||||
|
}
|
||||||
|
// Parity cells
|
||||||
|
for (; idx < numAllBlocks; idx++) {
|
||||||
|
expectedBlockLengths[idx] = fullStripeBlockOffset + parityLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check expected lengths against actual streamer lengths.
|
||||||
|
// Update if we have sufficient durability.
|
||||||
|
int numBlocksWithCorrectLength = 0;
|
||||||
|
for (int i = 0; i < numAllBlocks; i++) {
|
||||||
|
if (blockLengths.get(i) == expectedBlockLengths[i]) {
|
||||||
|
numBlocksWithCorrectLength++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (numBlocksWithCorrectLength >= numDataBlocks) {
|
||||||
|
ackedLength = sentBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ackedLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int stripeDataSize() {
|
private int stripeDataSize() {
|
||||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -61,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test striped file write operation with data node failures.
|
* Test striped file write operation with data node failures.
|
||||||
|
@ -390,6 +390,79 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When the two DataNodes with partial data blocks fail.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void runTestWithDifferentLengths() throws Exception {
|
||||||
|
assumeTrue("Skip this test case in the subclasses. Once is enough.",
|
||||||
|
this.getClass().equals(TestDFSStripedOutputStreamWithFailure.class));
|
||||||
|
|
||||||
|
final HdfsConfiguration conf = newHdfsConfiguration();
|
||||||
|
|
||||||
|
final int[] fileLengths = {
|
||||||
|
// Full stripe then partial on cell boundary
|
||||||
|
cellSize * (dataBlocks * 2 - 2),
|
||||||
|
// Full stripe and a partial on non-cell boundary
|
||||||
|
(cellSize * dataBlocks) + 123,
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
for (int length: fileLengths) {
|
||||||
|
// select the two DNs with partial block to kill
|
||||||
|
final int[] dnIndex = {dataBlocks - 2, dataBlocks - 1};
|
||||||
|
final int[] killPos = getKillPositions(length, dnIndex.length);
|
||||||
|
try {
|
||||||
|
LOG.info("runTestWithMultipleFailure2: length==" + length
|
||||||
|
+ ", killPos=" + Arrays.toString(killPos)
|
||||||
|
+ ", dnIndex=" + Arrays.toString(dnIndex));
|
||||||
|
setup(conf);
|
||||||
|
runTest(length, killPos, dnIndex, false);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
final String err = "failed, killPos=" + Arrays.toString(killPos)
|
||||||
|
+ ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
|
||||||
|
LOG.error(err);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
tearDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test writing very short EC files with many failures.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void runTestWithShortStripe() throws Exception {
|
||||||
|
assumeTrue("Skip this test case in the subclasses. Once is enough.",
|
||||||
|
this.getClass().equals(TestDFSStripedOutputStreamWithFailure.class));
|
||||||
|
|
||||||
|
final HdfsConfiguration conf = newHdfsConfiguration();
|
||||||
|
// Write a file with a 1 cell partial stripe
|
||||||
|
final int length = cellSize - 123;
|
||||||
|
// Kill all but one DN
|
||||||
|
final int[] dnIndex = new int[dataBlocks + parityBlocks - 1];
|
||||||
|
for (int i = 0; i < dnIndex.length; i++) {
|
||||||
|
dnIndex[i] = i;
|
||||||
|
}
|
||||||
|
final int[] killPos = getKillPositions(length, dnIndex.length);
|
||||||
|
|
||||||
|
try {
|
||||||
|
LOG.info("runTestWithShortStripe: length==" + length + ", killPos="
|
||||||
|
+ Arrays.toString(killPos) + ", dnIndex="
|
||||||
|
+ Arrays.toString(dnIndex));
|
||||||
|
setup(conf);
|
||||||
|
runTest(length, killPos, dnIndex, false);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
final String err = "failed, killPos=" + Arrays.toString(killPos)
|
||||||
|
+ ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
|
||||||
|
LOG.error(err);
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
tearDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* runTest implementation.
|
* runTest implementation.
|
||||||
* @param length file length
|
* @param length file length
|
||||||
|
@ -558,7 +631,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
|
|
||||||
private void run(int offset) {
|
private void run(int offset) {
|
||||||
int base = getBase();
|
int base = getBase();
|
||||||
Assume.assumeTrue(base >= 0);
|
assumeTrue(base >= 0);
|
||||||
final int i = offset + base;
|
final int i = offset + base;
|
||||||
final Integer length = getLength(i);
|
final Integer length = getLength(i);
|
||||||
if (length == null) {
|
if (length == null) {
|
||||||
|
|
Loading…
Reference in New Issue