HDFS-6227. ShortCircuitCache#unref should purge ShortCircuitReplicas whose streams have been closed by java interrupts. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1597829 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-05-27 17:21:41 +00:00
parent 1228f8f6fb
commit 8d9e8cec9f
3 changed files with 139 additions and 5 deletions

View File

@ -592,6 +592,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6443. Fix MiniQJMHACluster related test failures. (Zesheng Wu via HDFS-6443. Fix MiniQJMHACluster related test failures. (Zesheng Wu via
Arpit Agarwal) Arpit Agarwal)
HDFS-6227. ShortCircuitCache#unref should purge ShortCircuitReplicas whose
streams have been closed by java interrupts. (Colin Patrick McCabe via jing9)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -437,12 +437,23 @@ public class ShortCircuitCache implements Closeable {
void unref(ShortCircuitReplica replica) { void unref(ShortCircuitReplica replica) {
lock.lock(); lock.lock();
try { try {
// If the replica is stale, but we haven't purged it yet, let's do that. // If the replica is stale or unusable, but we haven't purged it yet,
// It would be a shame to evict a non-stale replica so that we could put // let's do that. It would be a shame to evict a non-stale replica so
// a stale one into the cache. // that we could put a stale or unusable one into the cache.
if ((!replica.purged) && replica.isStale()) { if (!replica.purged) {
String purgeReason = null;
if (!replica.getDataStream().getChannel().isOpen()) {
purgeReason = "purging replica because its data channel is closed.";
} else if (!replica.getMetaStream().getChannel().isOpen()) {
purgeReason = "purging replica because its meta channel is closed.";
} else if (replica.isStale()) {
purgeReason = "purging replica because it is stale.";
}
if (purgeReason != null) {
LOG.debug(this + ": " + purgeReason);
purge(replica); purge(replica);
} }
}
String addedString = ""; String addedString = "";
boolean shouldTrimEvictionMaps = false; boolean shouldTrimEvictionMaps = false;
int newRefCount = --replica.refCount; int newRefCount = --replica.refCount;

View File

@ -28,12 +28,15 @@ import static org.hamcrest.CoreMatchers.equalTo;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -409,4 +412,121 @@ public class TestBlockReaderFactory {
getDomainSocketWatcher().isClosed()); getDomainSocketWatcher().isClosed());
cluster.shutdown(); cluster.shutdown();
} }
/**
* When an InterruptedException is sent to a thread calling
* FileChannel#read, the FileChannel is immediately closed and the
* thread gets an exception. This effectively means that we might have
* someone asynchronously calling close() on the file descriptors we use
* in BlockReaderLocal. So when unreferencing a ShortCircuitReplica in
* ShortCircuitCache#unref, we should check if the FileChannel objects
* are still open. If not, we should purge the replica to avoid giving
* it out to any future readers.
*
* This is a regression test for HDFS-6227: Short circuit read failed
* due to ClosedChannelException.
*
* Note that you may still get ClosedChannelException errors if two threads
* are reading from the same replica and an InterruptedException is delivered
* to one of them.
*/
@Test(timeout=120000)
public void testPurgingClosedReplicas() throws Exception {
BlockReaderTestUtil.enableBlockReaderFactoryTracing();
final AtomicInteger replicasCreated = new AtomicInteger(0);
final AtomicBoolean testFailed = new AtomicBoolean(false);
DFSInputStream.tcpReadsDisabledForTesting = true;
BlockReaderFactory.createShortCircuitReplicaInfoCallback =
new ShortCircuitCache.ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
replicasCreated.incrementAndGet();
return null;
}
};
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testPurgingClosedReplicas", sockDir);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String TEST_FILE = "/test_file";
final int TEST_FILE_LEN = 4095;
final int SEED = 0xFADE0;
final DistributedFileSystem fs =
(DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
(short)1, SEED);
final Semaphore sem = new Semaphore(0);
final List<LocatedBlock> locatedBlocks =
cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
final LocatedBlock lblock = locatedBlocks.get(0); // first block
final byte[] buf = new byte[TEST_FILE_LEN];
Runnable readerRunnable = new Runnable() {
@Override
public void run() {
try {
while (true) {
BlockReader blockReader = null;
try {
blockReader = BlockReaderTestUtil.
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
sem.release();
try {
blockReader.readAll(buf, 0, TEST_FILE_LEN);
} finally {
sem.acquireUninterruptibly();
}
} catch (ClosedByInterruptException e) {
LOG.info("got the expected ClosedByInterruptException", e);
sem.release();
break;
} finally {
if (blockReader != null) blockReader.close();
}
LOG.info("read another " + TEST_FILE_LEN + " bytes.");
}
} catch (Throwable t) {
LOG.error("getBlockReader failure", t);
testFailed.set(true);
sem.release();
}
}
};
Thread thread = new Thread(readerRunnable);
thread.start();
// While the thread is reading, send it interrupts.
// These should trigger a ClosedChannelException.
while (thread.isAlive()) {
sem.acquireUninterruptibly();
thread.interrupt();
sem.release();
}
Assert.assertFalse(testFailed.get());
// We should be able to read from the file without
// getting a ClosedChannelException.
BlockReader blockReader = null;
try {
blockReader = BlockReaderTestUtil.
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
blockReader.readFully(buf, 0, TEST_FILE_LEN);
} finally {
if (blockReader != null) blockReader.close();
}
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(buf, expected));
// Another ShortCircuitReplica object should have been created.
Assert.assertEquals(2, replicasCreated.get());
dfs.close();
cluster.shutdown();
sockDir.close();
}
} }