diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6e5d3f5f791..2987e5184a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -592,6 +592,9 @@ Release 2.5.0 - UNRELEASED HDFS-6443. Fix MiniQJMHACluster related test failures. (Zesheng Wu via Arpit Agarwal) + HDFS-6227. ShortCircuitCache#unref should purge ShortCircuitReplicas whose + streams have been closed by java interrupts. (Colin Patrick McCabe via jing9) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index 1e7245df44b..7df340ac2a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -437,11 +437,22 @@ public class ShortCircuitCache implements Closeable { void unref(ShortCircuitReplica replica) { lock.lock(); try { - // If the replica is stale, but we haven't purged it yet, let's do that. - // It would be a shame to evict a non-stale replica so that we could put - // a stale one into the cache. - if ((!replica.purged) && replica.isStale()) { - purge(replica); + // If the replica is stale or unusable, but we haven't purged it yet, + // let's do that. It would be a shame to evict a non-stale replica so + // that we could put a stale or unusable one into the cache. + if (!replica.purged) { + String purgeReason = null; + if (!replica.getDataStream().getChannel().isOpen()) { + purgeReason = "purging replica because its data channel is closed."; + } else if (!replica.getMetaStream().getChannel().isOpen()) { + purgeReason = "purging replica because its meta channel is closed."; + } else if (replica.isStale()) { + purgeReason = "purging replica because it is stale."; + } + if (purgeReason != null) { + LOG.debug(this + ": " + purgeReason); + purge(replica); + } } String addedString = ""; boolean shouldTrimEvictionMaps = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java index 09d4353d7f1..a7dcbc1168c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java @@ -28,12 +28,15 @@ import static org.hamcrest.CoreMatchers.equalTo; import java.io.File; import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -409,4 +412,121 @@ public class TestBlockReaderFactory { getDomainSocketWatcher().isClosed()); cluster.shutdown(); } + + /** + * When an InterruptedException is sent to a thread calling + * FileChannel#read, the FileChannel is immediately closed and the + * thread gets an exception. This effectively means that we might have + * someone asynchronously calling close() on the file descriptors we use + * in BlockReaderLocal. So when unreferencing a ShortCircuitReplica in + * ShortCircuitCache#unref, we should check if the FileChannel objects + * are still open. If not, we should purge the replica to avoid giving + * it out to any future readers. + * + * This is a regression test for HDFS-6227: Short circuit read failed + * due to ClosedChannelException. + * + * Note that you may still get ClosedChannelException errors if two threads + * are reading from the same replica and an InterruptedException is delivered + * to one of them. + */ + @Test(timeout=120000) + public void testPurgingClosedReplicas() throws Exception { + BlockReaderTestUtil.enableBlockReaderFactoryTracing(); + final AtomicInteger replicasCreated = new AtomicInteger(0); + final AtomicBoolean testFailed = new AtomicBoolean(false); + DFSInputStream.tcpReadsDisabledForTesting = true; + BlockReaderFactory.createShortCircuitReplicaInfoCallback = + new ShortCircuitCache.ShortCircuitReplicaCreator() { + @Override + public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { + replicasCreated.incrementAndGet(); + return null; + } + }; + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testPurgingClosedReplicas", sockDir); + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4095; + final int SEED = 0xFADE0; + final DistributedFileSystem fs = + (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + + final Semaphore sem = new Semaphore(0); + final List locatedBlocks = + cluster.getNameNode().getRpcServer().getBlockLocations( + TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks(); + final LocatedBlock lblock = locatedBlocks.get(0); // first block + final byte[] buf = new byte[TEST_FILE_LEN]; + Runnable readerRunnable = new Runnable() { + @Override + public void run() { + try { + while (true) { + BlockReader blockReader = null; + try { + blockReader = BlockReaderTestUtil. + getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); + sem.release(); + try { + blockReader.readAll(buf, 0, TEST_FILE_LEN); + } finally { + sem.acquireUninterruptibly(); + } + } catch (ClosedByInterruptException e) { + LOG.info("got the expected ClosedByInterruptException", e); + sem.release(); + break; + } finally { + if (blockReader != null) blockReader.close(); + } + LOG.info("read another " + TEST_FILE_LEN + " bytes."); + } + } catch (Throwable t) { + LOG.error("getBlockReader failure", t); + testFailed.set(true); + sem.release(); + } + } + }; + Thread thread = new Thread(readerRunnable); + thread.start(); + + // While the thread is reading, send it interrupts. + // These should trigger a ClosedChannelException. + while (thread.isAlive()) { + sem.acquireUninterruptibly(); + thread.interrupt(); + sem.release(); + } + Assert.assertFalse(testFailed.get()); + + // We should be able to read from the file without + // getting a ClosedChannelException. + BlockReader blockReader = null; + try { + blockReader = BlockReaderTestUtil. + getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); + blockReader.readFully(buf, 0, TEST_FILE_LEN); + } finally { + if (blockReader != null) blockReader.close(); + } + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(buf, expected)); + + // Another ShortCircuitReplica object should have been created. + Assert.assertEquals(2, replicasCreated.get()); + + dfs.close(); + cluster.shutdown(); + sockDir.close(); + } }