HDFS-15211. EC: File write hangs during close in case of Exception during updatePipeline. Contributed by Ayush Saxena.

This commit is contained in:
Surendra Singh Lilhore 2020-03-15 20:44:32 +05:30
parent 74780c22eb
commit 1d772dc542
1 changed files with 35 additions and 25 deletions

View File

@ -404,6 +404,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
LOG.debug("newly failed streamers: " + newFailed); LOG.debug("newly failed streamers: " + newFailed);
} }
if (failCount > (numAllBlocks - numDataBlocks)) { if (failCount > (numAllBlocks - numDataBlocks)) {
closeAllStreamers();
throw new IOException("Failed: the number of failed blocks = " throw new IOException("Failed: the number of failed blocks = "
+ failCount + " > the number of parity blocks = " + failCount + " > the number of parity blocks = "
+ (numAllBlocks - numDataBlocks)); + (numAllBlocks - numDataBlocks));
@ -411,6 +412,13 @@ public class DFSStripedOutputStream extends DFSOutputStream
return newFailed; return newFailed;
} }
private void closeAllStreamers() {
// The write has failed, Close all the streamers.
for (StripedDataStreamer streamer : streamers) {
streamer.close(true);
}
}
private void handleCurrentStreamerFailure(String err, Exception e) private void handleCurrentStreamerFailure(String err, Exception e)
throws IOException { throws IOException {
currentPacket = null; currentPacket = null;
@ -670,6 +678,8 @@ public class DFSStripedOutputStream extends DFSOutputStream
newFailed = waitCreatingStreamers(healthySet); newFailed = waitCreatingStreamers(healthySet);
if (newFailed.size() + failedStreamers.size() > if (newFailed.size() + failedStreamers.size() >
numAllBlocks - numDataBlocks) { numAllBlocks - numDataBlocks) {
// The write has failed, Close all the streamers.
closeAllStreamers();
throw new IOException( throw new IOException(
"Data streamers failed while creating new block streams: " "Data streamers failed while creating new block streams: "
+ newFailed + ". There are not enough healthy streamers."); + newFailed + ". There are not enough healthy streamers.");
@ -1169,32 +1179,32 @@ public class DFSStripedOutputStream extends DFSOutputStream
@Override @Override
protected synchronized void closeImpl() throws IOException { protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
exceptionLastSeen.check(true);
// Writing to at least {dataUnits} replicas can be considered as success,
// and the rest of data can be recovered.
final int minReplication = ecPolicy.getNumDataUnits();
int goodStreamers = 0;
final MultipleIOException.Builder b = new MultipleIOException.Builder();
for (final StripedDataStreamer si : streamers) {
try {
si.getLastException().check(true);
goodStreamers++;
} catch (IOException e) {
b.add(e);
}
}
if (goodStreamers < minReplication) {
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
}
return;
}
try { try {
if (isClosed()) {
exceptionLastSeen.check(true);
// Writing to at least {dataUnits} replicas can be considered as
// success, and the rest of data can be recovered.
final int minReplication = ecPolicy.getNumDataUnits();
int goodStreamers = 0;
final MultipleIOException.Builder b = new MultipleIOException.Builder();
for (final StripedDataStreamer si : streamers) {
try {
si.getLastException().check(true);
goodStreamers++;
} catch (IOException e) {
b.add(e);
}
}
if (goodStreamers < minReplication) {
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
}
return;
}
try { try {
// flush from all upper layers // flush from all upper layers
flushBuffer(); flushBuffer();