HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. Contributed by Rushabh Shah.

(cherry picked from commit 63c966a3fb)
(cherry picked from commit 242c7f1fee)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
This commit is contained in:
Kihwal Lee 2016-03-18 10:38:33 -05:00
parent bb5749b1e0
commit a66d2a2a85
5 changed files with 95 additions and 0 deletions

View File

@ -146,6 +146,9 @@ Release 2.7.3 - UNRELEASED
HDFS-9904. testCheckpointCancellationDuringUpload occasionally fails.
(Lin Yiqun via kihwal)
HDFS-9874. Long living DataXceiver threads cause volume shutdown to block.
(Rushabh Shah via kihwal)
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES

View File

@ -173,6 +173,13 @@ public class ReplicaInPipeline extends ReplicaInfo
this.writer = writer;
}
public void interruptThread() {
if (writer != null && writer != Thread.currentThread()
&& writer.isAlive()) {
this.writer.interrupt();
}
}
@Override // Object
public boolean equals(Object o) {
return super.equals(o);

View File

@ -3048,5 +3048,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
s.add(blockId);
}
}
synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
for (String blockPoolId : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
for (ReplicaInfo replicaInfo : replicas) {
if (replicaInfo instanceof ReplicaInPipeline
&& replicaInfo.getVolume().equals(volume)) {
ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
replicaInPipeline.interruptThread();
}
}
}
}
}

View File

@ -229,6 +229,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
Preconditions.checkState(reference.getReferenceCount() > 0);
}
@VisibleForTesting
int getReferenceCount() {
return this.reference.getReferenceCount();
}
/**
* Close this volume and wait all other threads to release the reference count
* on this volume.
@ -237,6 +242,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
void closeAndWait() throws IOException {
try {
this.reference.setClosed();
dataset.stopAllDataxceiverThreads(this);
} catch (ClosedChannelException e) {
throw new IOException("The volume has already closed.", e);
}

View File

@ -20,14 +20,19 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
@ -48,6 +53,7 @@ import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
@ -437,4 +443,64 @@ public class TestFsDatasetImpl {
assertSame(replica,
BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
}
/**
* Tests stopping all the active DataXceiver thread on volume failure event.
* @throws Exception
*/
@Test
public void testCleanShutdownOfVolume() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration config = new HdfsConfiguration();
config.setLong(
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
Path filePath = new Path("test.dat");
// Create a file and keep the output stream unclosed.
FSDataOutputStream out = fs.create(filePath, (short) 1);
out.write(1);
out.hflush();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume(
block);
File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
.getBlockPoolId());
if (finalizedDir.exists()) {
// Remove write and execute access so that checkDiskErrorThread detects
// this volume is bad.
finalizedDir.setExecutable(false);
finalizedDir.setWritable(false);
}
Assert.assertTrue("Reference count for the volume should be greater "
+ "than 0", volume.getReferenceCount() > 0);
// Invoke the synchronous checkDiskError method
dataNode.getFSDataset().checkDataDir();
// Sleep for 1 second so that datanode can interrupt and cluster clean up
Thread.sleep(1000);
assertEquals("There are active threads still referencing volume: "
+ volume.getBasePath(), 0, volume.getReferenceCount());
LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
DatanodeInfo info = lb.getLocations()[0];
try {
out.close();
Assert.fail("This is not a valid code path. "
+ "out.close should have thrown an exception.");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getMessage().contains(info.toString()));
}
finalizedDir.setWritable(true);
finalizedDir.setExecutable(true);
} finally {
cluster.shutdown();
}
}
}