diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8a2c1157a07..09d09db0fc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -341,6 +341,9 @@ Release 2.7.0 - UNRELEASED HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport latency. (Ming Ma via kihwal) + HDFS-7494. Checking of closed in DFSInputStream#pread() should be protected + by synchronization (Ted Yu via Colin P. McCabe) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index b8b1d905e59..ed46b16d340 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -90,7 +91,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; private final DFSClient dfsClient; - private boolean closed = false; + private AtomicBoolean closed = new AtomicBoolean(false); private final String src; private final boolean verifyChecksum; @@ -661,7 +662,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, */ @Override public synchronized void close() throws IOException { - if (closed) { + if (!closed.compareAndSet(false, true)) { + DFSClient.LOG.warn("DFSInputStream has been closed already"); return; } dfsClient.checkOpen(); @@ -685,7 +687,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, blockReader = null; } super.close(); - closed = true; } @Override @@ -822,7 +823,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { dfsClient.checkOpen(); - if (closed) { + if (closed.get()) { throw new IOException("Stream closed"); } Map> corruptedBlockMap @@ -1375,7 +1376,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, throws IOException { // sanity checks dfsClient.checkOpen(); - if (closed) { + if (closed.get()) { throw new IOException("Stream closed"); } failures = 0; @@ -1484,7 +1485,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, if (targetPos < 0) { throw new EOFException("Cannot seek to negative offset"); } - if (closed) { + if (closed.get()) { throw new IOException("Stream is closed!"); } boolean done = false; @@ -1571,7 +1572,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, */ @Override public synchronized int available() throws IOException { - if (closed) { + if (closed.get()) { throw new IOException("Stream closed"); }