diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index d9406f0f9fc..5caca15684c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -183,6 +183,13 @@ public void setWriter(Thread writer) { this.writer = writer; } + public void interruptThread() { + if (writer != null && writer != Thread.currentThread() + && writer.isAlive()) { + this.writer.interrupt(); + } + } + @Override // Object public boolean equals(Object o) { return super.equals(o); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 2e8226a2e19..d6a0df65cac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -3112,5 +3112,18 @@ boolean reserveLockedMemory(long bytesNeeded) { public void setTimer(Timer newTimer) { this.timer = newTimer; } + + synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) { + for (String blockPoolId : volumeMap.getBlockPoolList()) { + Collection replicas = volumeMap.replicas(blockPoolId); + for (ReplicaInfo replicaInfo : replicas) { + if (replicaInfo instanceof ReplicaInPipeline + && replicaInfo.getVolume().equals(volume)) { + ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo; + replicaInPipeline.interruptThread(); + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 857d0ad561f..0d060f9eae0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -240,6 +240,11 @@ private void checkReference() { Preconditions.checkState(reference.getReferenceCount() > 0); } + @VisibleForTesting + int getReferenceCount() { + return this.reference.getReferenceCount(); + } + /** * Close this volume. * @throws IOException if the volume is closed. @@ -247,6 +252,7 @@ private void checkReference() { void setClosed() throws IOException { try { this.reference.setClosed(); + dataset.stopAllDataxceiverThreads(this); } catch (ClosedChannelException e) { throw new IOException("The volume has already closed.", e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index b2cfe89cb86..70e93326e20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -21,14 +21,19 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; @@ -51,6 +56,7 @@ import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.StringUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; @@ -622,4 +628,64 @@ public void run() { LOG.info("Volumes removed"); brReceivedLatch.await(); } + + /** + * Tests stopping all the active DataXceiver thread on volume failure event. + * @throws Exception + */ + @Test + public void testCleanShutdownOfVolume() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration config = new HdfsConfiguration(); + config.setLong( + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000); + config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + + cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + Path filePath = new Path("test.dat"); + // Create a file and keep the output stream unclosed. + FSDataOutputStream out = fs.create(filePath, (short) 1); + out.write(1); + out.hflush(); + + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); + FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume( + block); + File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem() + .getBlockPoolId()); + + if (finalizedDir.exists()) { + // Remove write and execute access so that checkDiskErrorThread detects + // this volume is bad. + finalizedDir.setExecutable(false); + finalizedDir.setWritable(false); + } + Assert.assertTrue("Reference count for the volume should be greater " + + "than 0", volume.getReferenceCount() > 0); + // Invoke the synchronous checkDiskError method + dataNode.getFSDataset().checkDataDir(); + // Sleep for 1 second so that datanode can interrupt and cluster clean up + Thread.sleep(1000); + assertEquals("There are active threads still referencing volume: " + + volume.getBasePath(), 0, volume.getReferenceCount()); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0); + DatanodeInfo info = lb.getLocations()[0]; + + try { + out.close(); + Assert.fail("This is not a valid code path. " + + "out.close should have thrown an exception."); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getMessage().contains(info.toString())); + } + finalizedDir.setWritable(true); + finalizedDir.setExecutable(true); + } finally { + cluster.shutdown(); + } + } }