HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao

(cherry picked from commit be34e85e68)
This commit is contained in:
Jing Zhao 2016-06-07 10:48:21 -07:00
parent 19eb997f67
commit 8b34040cb9
3 changed files with 119 additions and 8 deletions

View File

@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -307,7 +309,7 @@ public class DFSInputStream extends FSInputStream
try { try {
Thread.sleep(waitTime); Thread.sleep(waitTime);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException( throw new InterruptedIOException(
"Interrupted while getting the last block length."); "Interrupted while getting the last block length.");
} }
} }
@ -382,6 +384,7 @@ public class DFSInputStream extends FSInputStream
return n; return n;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
checkInterrupted(ioe);
if (ioe instanceof RemoteException) { if (ioe instanceof RemoteException) {
if (((RemoteException) ioe).unwrapRemoteException() instanceof if (((RemoteException) ioe).unwrapRemoteException() instanceof
ReplicaNotFoundException) { ReplicaNotFoundException) {
@ -417,7 +420,8 @@ public class DFSInputStream extends FSInputStream
try { try {
Thread.sleep(500); // delay between retries. Thread.sleep(500); // delay between retries.
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Interrupted while getting the length."); throw new InterruptedIOException(
"Interrupted while getting the length.");
} }
} }
@ -663,6 +667,7 @@ public class DFSInputStream extends FSInputStream
} }
return chosenNode; return chosenNode;
} catch (IOException ex) { } catch (IOException ex) {
checkInterrupted(ex);
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, " DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr + "encryption key was invalid when connecting to " + targetAddr
@ -684,6 +689,15 @@ public class DFSInputStream extends FSInputStream
} }
} }
private void checkInterrupted(IOException e) throws IOException {
if (Thread.currentThread().isInterrupted() &&
(e instanceof ClosedByInterruptException ||
e instanceof InterruptedIOException)) {
DFSClient.LOG.debug("The reading thread has been interrupted.", e);
throw e;
}
}
protected BlockReader getBlockReader(LocatedBlock targetBlock, protected BlockReader getBlockReader(LocatedBlock targetBlock,
long offsetInBlock, long length, InetSocketAddress targetAddr, long offsetInBlock, long length, InetSocketAddress targetAddr,
StorageType storageType, DatanodeInfo datanode) throws IOException { StorageType storageType, DatanodeInfo datanode) throws IOException {
@ -950,6 +964,7 @@ public class DFSInputStream extends FSInputStream
} catch (ChecksumException ce) { } catch (ChecksumException ce) {
throw ce; throw ce;
} catch (IOException e) { } catch (IOException e) {
checkInterrupted(e);
if (retries == 1) { if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e); DFSClient.LOG.warn("DFS Read", e);
} }
@ -1064,9 +1079,12 @@ public class DFSInputStream extends FSInputStream
// expanding time window for each failure // expanding time window for each failure
timeWindow * (failures + 1) * timeWindow * (failures + 1) *
ThreadLocalRandom.current().nextDouble(); ThreadLocalRandom.current().nextDouble();
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
" IOException, will wait for " + waitTime + " msec.");
Thread.sleep((long)waitTime); Thread.sleep((long)waitTime);
} catch (InterruptedException ignored) { } catch (InterruptedException e) {
throw new InterruptedIOException(
"Interrupted while choosing DataNode for read.");
} }
deadNodes.clear(); //2nd option is to remove only nodes[blockId] deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo(true); openInfo(true);
@ -1152,7 +1170,8 @@ public class DFSInputStream extends FSInputStream
buf, offset, corruptedBlockMap); buf, offset, corruptedBlockMap);
return; return;
} catch (IOException e) { } catch (IOException e) {
// Ignore. Already processed inside the function. checkInterrupted(e); // check if the read has been interrupted
// Ignore other IOException. Already processed inside the function.
// Loop through to try the next node. // Loop through to try the next node.
} }
} }
@ -1249,6 +1268,7 @@ public class DFSInputStream extends FSInputStream
addToDeadNodes(datanode.info); addToDeadNodes(datanode.info);
throw new IOException(msg); throw new IOException(msg);
} catch (IOException e) { } catch (IOException e) {
checkInterrupted(e);
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, " DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + datanode.addr + "encryption key was invalid when connecting to " + datanode.addr
@ -1356,8 +1376,11 @@ public class DFSInputStream extends FSInputStream
ignored.add(chosenNode.info); ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps(); dfsClient.getHedgedReadMetrics().incHedgedReadOps();
// continue; no need to refresh block locations // continue; no need to refresh block locations
} catch (InterruptedException | ExecutionException e) { } catch (ExecutionException e) {
// Ignore // Ignore
} catch (InterruptedException e) {
throw new InterruptedIOException(
"Interrupted while waiting for reading task");
} }
} else { } else {
// We are starting up a 'hedged' read. We have a read already // We are starting up a 'hedged' read. We have a read already
@ -1630,6 +1653,7 @@ public class DFSInputStream extends FSInputStream
} catch (IOException e) {//make following read to retry } catch (IOException e) {//make following read to retry
DFSClient.LOG.debug("Exception while seek to {} from {} of {} from " DFSClient.LOG.debug("Exception while seek to {} from {} of {} from "
+ "{}", targetPos, getCurrentBlock(), src, currentNode, e); + "{}", targetPos, getCurrentBlock(), src, currentNode, e);
checkInterrupted(e);
} }
} }
} }

View File

@ -19,9 +19,19 @@ package org.apache.hadoop.hdfs;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -103,4 +113,81 @@ public class TestRead {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout=60000)
public void testInterruptReader() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
DelayedSimulatedFSDataset.Factory.class.getName());
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf).numDataNodes(1).build();
final DistributedFileSystem fs = cluster.getFileSystem();
try {
cluster.waitActive();
final Path file = new Path("/foo");
DFSTestUtil.createFile(fs, file, 1024, (short) 1, 0L);
final FSDataInputStream in = fs.open(file);
final AtomicBoolean readInterrupted = new AtomicBoolean(false);
final Thread reader = new Thread(new Runnable() {
@Override
public void run() {
try {
in.read(new byte[1024], 0, 1024);
} catch (IOException e) {
if (e instanceof ClosedByInterruptException ||
e instanceof InterruptedIOException) {
readInterrupted.set(true);
}
}
}
});
reader.start();
Thread.sleep(1000);
reader.interrupt();
reader.join();
Assert.assertTrue(readInterrupted.get());
} finally {
cluster.shutdown();
}
}
private static class DelayedSimulatedFSDataset extends SimulatedFSDataset {
private volatile boolean isDelayed = true;
DelayedSimulatedFSDataset(DataNode datanode, DataStorage storage,
Configuration conf) {
super(datanode, storage, conf);
}
@Override
public synchronized InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
while (isDelayed) {
try {
this.wait();
} catch (InterruptedException ignored) {
}
}
InputStream result = super.getBlockInputStream(b);
IOUtils.skipFully(result, seekOffset);
return result;
}
static class Factory extends FsDatasetSpi.Factory<DelayedSimulatedFSDataset> {
@Override
public DelayedSimulatedFSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException {
return new DelayedSimulatedFSDataset(datanode, storage, conf);
}
@Override
public boolean isSimulated() {
return true;
}
}
}
} }

View File

@ -960,8 +960,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return new ReplicaHandler(binfo, null); return new ReplicaHandler(binfo, null);
} }
synchronized InputStream getBlockInputStream(ExtendedBlock b protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
) throws IOException { throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock()); BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {