Revert "HDFS-14706. Checksums are not checked if block meta file is less than 7 bytes. Contributed by Stephen O'Donnell."
This reverts commit 7bebad61d9
.
This commit is contained in:
parent
040f6e93bb
commit
d207aba026
|
@ -23,7 +23,6 @@ import java.io.InterruptedIOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
@ -127,8 +126,6 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
protected final AtomicReference<CachingStrategy> cachingStrategy;
|
protected final AtomicReference<CachingStrategy> cachingStrategy;
|
||||||
private FileEncryptionInfo fileEncryptionInfo;
|
private FileEncryptionInfo fileEncryptionInfo;
|
||||||
private int writePacketSize;
|
private int writePacketSize;
|
||||||
private boolean leaseRecovered = false;
|
|
||||||
private boolean exceptionInClose = false; //for unit test
|
|
||||||
|
|
||||||
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
||||||
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
|
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
|
||||||
|
@ -839,39 +836,6 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public void setExceptionInClose(boolean enable) {
|
|
||||||
exceptionInClose = enable;
|
|
||||||
}
|
|
||||||
|
|
||||||
private class EmulateExceptionInClose {
|
|
||||||
private Random rand = null;
|
|
||||||
private int kickedNum;
|
|
||||||
|
|
||||||
EmulateExceptionInClose(int callNum) {
|
|
||||||
if (exceptionInClose) {
|
|
||||||
rand = new Random();
|
|
||||||
}
|
|
||||||
kickedNum = callNum;
|
|
||||||
}
|
|
||||||
|
|
||||||
void kickRandomException() throws IOException {
|
|
||||||
if (exceptionInClose) {
|
|
||||||
if (kickedNum > 0) {
|
|
||||||
if (rand.nextInt(kickedNum) == 1) {
|
|
||||||
throw new IOException("Emulated random IOException in close");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void kickException() throws IOException {
|
|
||||||
if (exceptionInClose) {
|
|
||||||
throw new IOException("Emulated IOException in close");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes this output stream and releases any system
|
* Closes this output stream and releases any system
|
||||||
* resources associated with this stream.
|
* resources associated with this stream.
|
||||||
|
@ -894,20 +858,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void closeImpl() throws IOException {
|
protected synchronized void closeImpl() throws IOException {
|
||||||
boolean recoverOnCloseException = dfsClient.getConfiguration().getBoolean(
|
|
||||||
HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY,
|
|
||||||
HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_DEFAULT);
|
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
if (recoverOnCloseException && !leaseRecovered) {
|
|
||||||
try {
|
|
||||||
dfsClient.endFileLease(fileId);
|
|
||||||
dfsClient.recoverLease(src);
|
|
||||||
leaseRecovered = true;
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn("Fail to recover lease for {}", src, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
|
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
|
||||||
closed, getStreamer().streamerClosed());
|
closed, getStreamer().streamerClosed());
|
||||||
try {
|
try {
|
||||||
|
@ -924,11 +875,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
EmulateExceptionInClose eei = new EmulateExceptionInClose(5);
|
|
||||||
try {
|
try {
|
||||||
flushBuffer(); // flush from all upper layers
|
flushBuffer(); // flush from all upper layers
|
||||||
// for test
|
|
||||||
eei.kickRandomException();
|
|
||||||
|
|
||||||
if (currentPacket != null) {
|
if (currentPacket != null) {
|
||||||
enqueueCurrentPacket();
|
enqueueCurrentPacket();
|
||||||
|
@ -939,28 +887,12 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
flushInternal(); // flush all data to Datanodes
|
flushInternal(); // flush all data to Datanodes
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
cleanupAndRethrowIOException(ioe);
|
cleanupAndRethrowIOException(ioe);
|
||||||
}
|
}
|
||||||
// for test
|
|
||||||
eei.kickRandomException();
|
|
||||||
completeFile();
|
completeFile();
|
||||||
// for test
|
|
||||||
eei.kickException();
|
|
||||||
} catch (ClosedChannelException ignored) {
|
} catch (ClosedChannelException ignored) {
|
||||||
} catch (IOException ioe) {
|
|
||||||
if (recoverOnCloseException) {
|
|
||||||
try {
|
|
||||||
dfsClient.endFileLease(fileId);
|
|
||||||
dfsClient.recoverLease(src);
|
|
||||||
leaseRecovered = true;
|
|
||||||
} catch (Exception e) {
|
|
||||||
// Ignore exception rendered by recoverLease. Throw original
|
|
||||||
// exception
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw ioe;
|
|
||||||
} finally {
|
} finally {
|
||||||
// Failures may happen when flushing data.
|
// Failures may happen when flushing data.
|
||||||
// Streamers may keep waiting for the new block information.
|
// Streamers may keep waiting for the new block information.
|
||||||
|
|
|
@ -307,9 +307,6 @@ public interface HdfsClientConfigKeys {
|
||||||
String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY =
|
String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY =
|
||||||
PREFIX + "exclude.nodes.cache.expiry.interval.millis";
|
PREFIX + "exclude.nodes.cache.expiry.interval.millis";
|
||||||
long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE;
|
long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE;
|
||||||
String RECOVER_ON_CLOSE_EXCEPTION_KEY =
|
|
||||||
PREFIX + "recover.on.close.exception";
|
|
||||||
boolean RECOVER_ON_CLOSE_EXCEPTION_DEFAULT = false;
|
|
||||||
|
|
||||||
interface ByteArrayManager {
|
interface ByteArrayManager {
|
||||||
String PREFIX = Write.PREFIX + "byte-array-manager.";
|
String PREFIX = Write.PREFIX + "byte-array-manager.";
|
||||||
|
|
|
@ -31,7 +31,6 @@ import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.lang3.reflect.FieldUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -41,7 +40,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
|
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer;
|
import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -373,41 +371,6 @@ public class TestDFSOutputStream {
|
||||||
os.close();
|
os.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* If dfs.client.recover-on-close-exception.enable is set and exception
|
|
||||||
* happens in close, the local lease should be closed and lease in namenode
|
|
||||||
* should be recovered.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testExceptionInClose() throws Exception {
|
|
||||||
String testStr = "Test exception in close";
|
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
|
||||||
Path testFile = new Path("/closeexception");
|
|
||||||
fs.getConf().setBoolean(
|
|
||||||
HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY, true);
|
|
||||||
FSDataOutputStream os = fs.create(testFile);
|
|
||||||
DFSOutputStream dos =
|
|
||||||
(DFSOutputStream) FieldUtils.readField(os, "wrappedStream", true);
|
|
||||||
dos.setExceptionInClose(true);
|
|
||||||
os.write(testStr.getBytes());
|
|
||||||
try {
|
|
||||||
dos.close();
|
|
||||||
// There should be exception
|
|
||||||
Assert.assertTrue(false);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
GenericTestUtils.waitFor(() -> {
|
|
||||||
boolean closed;
|
|
||||||
try {
|
|
||||||
closed = fs.isFileClosed(testFile);
|
|
||||||
} catch (IOException e) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return closed;
|
|
||||||
}, 1000, 5000);
|
|
||||||
Assert.assertTrue(fs.isFileClosed(testFile));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() {
|
public static void tearDown() {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
|
|
Loading…
Reference in New Issue