HDFS-14706. Checksums are not checked if block meta file is less than 7 bytes. Contributed by Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 7bebad61d9)
This commit is contained in:
Stephen O'Donnell 2019-08-29 17:37:05 -07:00 committed by Wei-Chiu Chuang
parent 7c6fc964fd
commit 9c0d6e1657
3 changed files with 110 additions and 2 deletions

View File

@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -126,6 +127,8 @@ public class DFSOutputStream extends FSOutputSummer
protected final AtomicReference<CachingStrategy> cachingStrategy; protected final AtomicReference<CachingStrategy> cachingStrategy;
private FileEncryptionInfo fileEncryptionInfo; private FileEncryptionInfo fileEncryptionInfo;
private int writePacketSize; private int writePacketSize;
private boolean leaseRecovered = false;
private boolean exceptionInClose = false; //for unit test
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
protected DFSPacket createPacket(int packetSize, int chunksPerPkt, protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
@ -832,6 +835,39 @@ public class DFSOutputStream extends FSOutputSummer
} }
} }
@VisibleForTesting
public void setExceptionInClose(boolean enable) {
exceptionInClose = enable;
}
private class EmulateExceptionInClose {
private Random rand = null;
private int kickedNum;
EmulateExceptionInClose(int callNum) {
if (exceptionInClose) {
rand = new Random();
}
kickedNum = callNum;
}
void kickRandomException() throws IOException {
if (exceptionInClose) {
if (kickedNum > 0) {
if (rand.nextInt(kickedNum) == 1) {
throw new IOException("Emulated random IOException in close");
}
}
}
}
void kickException() throws IOException {
if (exceptionInClose) {
throw new IOException("Emulated IOException in close");
}
}
}
/** /**
* Closes this output stream and releases any system * Closes this output stream and releases any system
* resources associated with this stream. * resources associated with this stream.
@ -854,7 +890,20 @@ public class DFSOutputStream extends FSOutputSummer
} }
protected synchronized void closeImpl() throws IOException { protected synchronized void closeImpl() throws IOException {
boolean recoverOnCloseException = dfsClient.getConfiguration().getBoolean(
HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY,
HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_DEFAULT);
if (isClosed()) { if (isClosed()) {
if (recoverOnCloseException && !leaseRecovered) {
try {
dfsClient.endFileLease(fileId);
dfsClient.recoverLease(src);
leaseRecovered = true;
} catch (Exception e) {
LOG.warn("Fail to recover lease for {}", src, e);
}
}
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]", LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
closed, getStreamer().streamerClosed()); closed, getStreamer().streamerClosed());
try { try {
@ -871,8 +920,11 @@ public class DFSOutputStream extends FSOutputSummer
return; return;
} }
EmulateExceptionInClose eei = new EmulateExceptionInClose(5);
try { try {
flushBuffer(); // flush from all upper layers flushBuffer(); // flush from all upper layers
// for test
eei.kickRandomException();
if (currentPacket != null) { if (currentPacket != null) {
enqueueCurrentPacket(); enqueueCurrentPacket();
@ -883,12 +935,28 @@ public class DFSOutputStream extends FSOutputSummer
} }
try { try {
flushInternal(); // flush all data to Datanodes flushInternal(); // flush all data to Datanodes
} catch (IOException ioe) { } catch (IOException ioe) {
cleanupAndRethrowIOException(ioe); cleanupAndRethrowIOException(ioe);
} }
// for test
eei.kickRandomException();
completeFile(); completeFile();
// for test
eei.kickException();
} catch (ClosedChannelException ignored) { } catch (ClosedChannelException ignored) {
} catch (IOException ioe) {
if (recoverOnCloseException) {
try {
dfsClient.endFileLease(fileId);
dfsClient.recoverLease(src);
leaseRecovered = true;
} catch (Exception e) {
// Ignore exception rendered by recoverLease. Throw original
// exception
}
}
throw ioe;
} finally { } finally {
// Failures may happen when flushing data. // Failures may happen when flushing data.
// Streamers may keep waiting for the new block information. // Streamers may keep waiting for the new block information.

View File

@ -302,6 +302,9 @@ public interface HdfsClientConfigKeys {
String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY = String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY =
PREFIX + "exclude.nodes.cache.expiry.interval.millis"; PREFIX + "exclude.nodes.cache.expiry.interval.millis";
long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE; long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE;
String RECOVER_ON_CLOSE_EXCEPTION_KEY =
PREFIX + "recover.on.close.exception";
boolean RECOVER_ON_CLOSE_EXCEPTION_DEFAULT = false;
interface ByteArrayManager { interface ByteArrayManager {
String PREFIX = Write.PREFIX + "byte-array-manager."; String PREFIX = Write.PREFIX + "byte-array-manager.";

View File

@ -31,6 +31,7 @@ import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability; import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer; import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -371,6 +373,41 @@ public class TestDFSOutputStream {
os.close(); os.close();
} }
/**
* If dfs.client.recover-on-close-exception.enable is set and exception
* happens in close, the local lease should be closed and lease in namenode
* should be recovered.
*/
@Test
public void testExceptionInClose() throws Exception {
String testStr = "Test exception in close";
DistributedFileSystem fs = cluster.getFileSystem();
Path testFile = new Path("/closeexception");
fs.getConf().setBoolean(
HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY, true);
FSDataOutputStream os = fs.create(testFile);
DFSOutputStream dos =
(DFSOutputStream) FieldUtils.readField(os, "wrappedStream", true);
dos.setExceptionInClose(true);
os.write(testStr.getBytes());
try {
dos.close();
// There should be exception
Assert.assertTrue(false);
} catch (IOException ioe) {
GenericTestUtils.waitFor(() -> {
boolean closed;
try {
closed = fs.isFileClosed(testFile);
} catch (IOException e) {
return false;
}
return closed;
}, 1000, 5000);
Assert.assertTrue(fs.isFileClosed(testFile));
}
}
@AfterClass @AfterClass
public static void tearDown() { public static void tearDown() {
if (cluster != null) { if (cluster != null) {