diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 4a83a53bc5b..fb8d207ae6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -307,7 +309,7 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(waitTime); } catch (InterruptedException e) { - throw new IOException( + throw new InterruptedIOException( "Interrupted while getting the last block length."); } } @@ -382,6 +384,7 @@ public class DFSInputStream extends FSInputStream return n; } } catch (IOException ioe) { + checkInterrupted(ioe); if (ioe instanceof RemoteException) { if (((RemoteException) ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) { @@ -417,7 +420,8 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(500); // delay between retries. } catch (InterruptedException e) { - throw new IOException("Interrupted while getting the length."); + throw new InterruptedIOException( + "Interrupted while getting the length."); } } @@ -663,6 +667,7 @@ public class DFSInputStream extends FSInputStream } return chosenNode; } catch (IOException ex) { + checkInterrupted(ex); if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr @@ -684,6 +689,15 @@ public class DFSInputStream extends FSInputStream } } + private void checkInterrupted(IOException e) throws IOException { + if (Thread.currentThread().isInterrupted() && + (e instanceof ClosedByInterruptException || + e instanceof InterruptedIOException)) { + DFSClient.LOG.debug("The reading thread has been interrupted.", e); + throw e; + } + } + protected BlockReader getBlockReader(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode) throws IOException { @@ -950,6 +964,7 @@ public class DFSInputStream extends FSInputStream } catch (ChecksumException ce) { throw ce; } catch (IOException e) { + checkInterrupted(e); if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } @@ -1064,9 +1079,12 @@ public class DFSInputStream extends FSInputStream // expanding time window for each failure timeWindow * (failures + 1) * ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); + DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + + " IOException, will wait for " + waitTime + " msec."); Thread.sleep((long)waitTime); - } catch (InterruptedException ignored) { + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted while choosing DataNode for read."); } deadNodes.clear(); //2nd option is to remove only nodes[blockId] openInfo(true); @@ -1152,7 +1170,8 @@ public class DFSInputStream extends FSInputStream buf, offset, corruptedBlockMap); return; } catch (IOException e) { - // Ignore. Already processed inside the function. + checkInterrupted(e); // check if the read has been interrupted + // Ignore other IOException. Already processed inside the function. // Loop through to try the next node. } } @@ -1249,6 +1268,7 @@ public class DFSInputStream extends FSInputStream addToDeadNodes(datanode.info); throw new IOException(msg); } catch (IOException e) { + checkInterrupted(e); if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + datanode.addr @@ -1356,8 +1376,11 @@ public class DFSInputStream extends FSInputStream ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); // continue; no need to refresh block locations - } catch (InterruptedException | ExecutionException e) { + } catch (ExecutionException e) { // Ignore + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted while waiting for reading task"); } } else { // We are starting up a 'hedged' read. We have a read already @@ -1630,6 +1653,7 @@ public class DFSInputStream extends FSInputStream } catch (IOException e) {//make following read to retry DFSClient.LOG.debug("Exception while seek to {} from {} of {} from " + "{}", targetPos, getCurrentBlock(), src, currentNode, e); + checkInterrupted(e); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java index 9d38fd77c19..ab8b44b74f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java @@ -19,9 +19,19 @@ package org.apache.hadoop.hdfs; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -103,4 +113,81 @@ public class TestRead { cluster.shutdown(); } } + + @Test(timeout=60000) + public void testInterruptReader() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, + DelayedSimulatedFSDataset.Factory.class.getName()); + + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf).numDataNodes(1).build(); + final DistributedFileSystem fs = cluster.getFileSystem(); + try { + cluster.waitActive(); + final Path file = new Path("/foo"); + DFSTestUtil.createFile(fs, file, 1024, (short) 1, 0L); + + final FSDataInputStream in = fs.open(file); + final AtomicBoolean readInterrupted = new AtomicBoolean(false); + final Thread reader = new Thread(new Runnable() { + @Override + public void run() { + try { + in.read(new byte[1024], 0, 1024); + } catch (IOException e) { + if (e instanceof ClosedByInterruptException || + e instanceof InterruptedIOException) { + readInterrupted.set(true); + } + } + } + }); + + reader.start(); + Thread.sleep(1000); + reader.interrupt(); + reader.join(); + + Assert.assertTrue(readInterrupted.get()); + } finally { + cluster.shutdown(); + } + } + + private static class DelayedSimulatedFSDataset extends SimulatedFSDataset { + private volatile boolean isDelayed = true; + + DelayedSimulatedFSDataset(DataNode datanode, DataStorage storage, + Configuration conf) { + super(datanode, storage, conf); + } + + @Override + public synchronized InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { + while (isDelayed) { + try { + this.wait(); + } catch (InterruptedException ignored) { + } + } + InputStream result = super.getBlockInputStream(b); + IOUtils.skipFully(result, seekOffset); + return result; + } + + static class Factory extends FsDatasetSpi.Factory { + @Override + public DelayedSimulatedFSDataset newInstance(DataNode datanode, + DataStorage storage, Configuration conf) throws IOException { + return new DelayedSimulatedFSDataset(datanode, storage, conf); + } + + @Override + public boolean isSimulated() { + return true; + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index a2ae209adf3..a222c0f53e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -960,8 +960,8 @@ public class SimulatedFSDataset implements FsDatasetSpi { return new ReplicaHandler(binfo, null); } - synchronized InputStream getBlockInputStream(ExtendedBlock b - ) throws IOException { + protected synchronized InputStream getBlockInputStream(ExtendedBlock b) + throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) {