HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. Contributed by Rushabh Shah.
This commit is contained in:
parent
dc951e606f
commit
63c966a3fb
|
@ -183,6 +183,13 @@ public class ReplicaInPipeline extends ReplicaInfo
|
||||||
this.writer = writer;
|
this.writer = writer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void interruptThread() {
|
||||||
|
if (writer != null && writer != Thread.currentThread()
|
||||||
|
&& writer.isAlive()) {
|
||||||
|
this.writer.interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override // Object
|
@Override // Object
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
return super.equals(o);
|
return super.equals(o);
|
||||||
|
|
|
@ -3112,5 +3112,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
public void setTimer(Timer newTimer) {
|
public void setTimer(Timer newTimer) {
|
||||||
this.timer = newTimer;
|
this.timer = newTimer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
|
||||||
|
for (String blockPoolId : volumeMap.getBlockPoolList()) {
|
||||||
|
Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
|
||||||
|
for (ReplicaInfo replicaInfo : replicas) {
|
||||||
|
if (replicaInfo instanceof ReplicaInPipeline
|
||||||
|
&& replicaInfo.getVolume().equals(volume)) {
|
||||||
|
ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
|
||||||
|
replicaInPipeline.interruptThread();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -240,6 +240,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
Preconditions.checkState(reference.getReferenceCount() > 0);
|
Preconditions.checkState(reference.getReferenceCount() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getReferenceCount() {
|
||||||
|
return this.reference.getReferenceCount();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close this volume.
|
* Close this volume.
|
||||||
* @throws IOException if the volume is closed.
|
* @throws IOException if the volume is closed.
|
||||||
|
@ -247,6 +252,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
void setClosed() throws IOException {
|
void setClosed() throws IOException {
|
||||||
try {
|
try {
|
||||||
this.reference.setClosed();
|
this.reference.setClosed();
|
||||||
|
dataset.stopAllDataxceiverThreads(this);
|
||||||
} catch (ClosedChannelException e) {
|
} catch (ClosedChannelException e) {
|
||||||
throw new IOException("The volume has already closed.", e);
|
throw new IOException("The volume has already closed.", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,14 +21,19 @@ import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
|
@ -51,6 +56,7 @@ import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.DiskChecker;
|
import org.apache.hadoop.util.DiskChecker;
|
||||||
import org.apache.hadoop.util.FakeTimer;
|
import org.apache.hadoop.util.FakeTimer;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Matchers;
|
import org.mockito.Matchers;
|
||||||
|
@ -622,4 +628,64 @@ public class TestFsDatasetImpl {
|
||||||
LOG.info("Volumes removed");
|
LOG.info("Volumes removed");
|
||||||
brReceivedLatch.await();
|
brReceivedLatch.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests stopping all the active DataXceiver thread on volume failure event.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCleanShutdownOfVolume() throws Exception {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
Configuration config = new HdfsConfiguration();
|
||||||
|
config.setLong(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
|
||||||
|
config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||||
|
Path filePath = new Path("test.dat");
|
||||||
|
// Create a file and keep the output stream unclosed.
|
||||||
|
FSDataOutputStream out = fs.create(filePath, (short) 1);
|
||||||
|
out.write(1);
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
|
||||||
|
FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume(
|
||||||
|
block);
|
||||||
|
File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
|
||||||
|
.getBlockPoolId());
|
||||||
|
|
||||||
|
if (finalizedDir.exists()) {
|
||||||
|
// Remove write and execute access so that checkDiskErrorThread detects
|
||||||
|
// this volume is bad.
|
||||||
|
finalizedDir.setExecutable(false);
|
||||||
|
finalizedDir.setWritable(false);
|
||||||
|
}
|
||||||
|
Assert.assertTrue("Reference count for the volume should be greater "
|
||||||
|
+ "than 0", volume.getReferenceCount() > 0);
|
||||||
|
// Invoke the synchronous checkDiskError method
|
||||||
|
dataNode.getFSDataset().checkDataDir();
|
||||||
|
// Sleep for 1 second so that datanode can interrupt and cluster clean up
|
||||||
|
Thread.sleep(1000);
|
||||||
|
assertEquals("There are active threads still referencing volume: "
|
||||||
|
+ volume.getBasePath(), 0, volume.getReferenceCount());
|
||||||
|
LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
|
||||||
|
DatanodeInfo info = lb.getLocations()[0];
|
||||||
|
|
||||||
|
try {
|
||||||
|
out.close();
|
||||||
|
Assert.fail("This is not a valid code path. "
|
||||||
|
+ "out.close should have thrown an exception.");
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
Assert.assertTrue(ioe.getMessage().contains(info.toString()));
|
||||||
|
}
|
||||||
|
finalizedDir.setWritable(true);
|
||||||
|
finalizedDir.setExecutable(true);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue