HDFS-15963. Unreleased volume references cause an infinite loop. (#2941) Contributed by Shuyan Zhang.
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org> Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
29414871bd
commit
9f2db2c9fd
|
@ -431,6 +431,7 @@ class BlockSender implements java.io.Closeable {
|
||||||
ris = new ReplicaInputStreams(
|
ris = new ReplicaInputStreams(
|
||||||
blockIn, checksumIn, volumeRef, fileIoProvider);
|
blockIn, checksumIn, volumeRef, fileIoProvider);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
IOUtils.cleanupWithLogger(null, volumeRef);
|
||||||
IOUtils.closeStream(this);
|
IOUtils.closeStream(this);
|
||||||
org.apache.commons.io.IOUtils.closeQuietly(blockIn);
|
org.apache.commons.io.IOUtils.closeQuietly(blockIn);
|
||||||
org.apache.commons.io.IOUtils.closeQuietly(checksumIn);
|
org.apache.commons.io.IOUtils.closeQuietly(checksumIn);
|
||||||
|
|
|
@ -167,18 +167,26 @@ class FsDatasetAsyncDiskService {
|
||||||
* Execute the task sometime in the future, using ThreadPools.
|
* Execute the task sometime in the future, using ThreadPools.
|
||||||
*/
|
*/
|
||||||
synchronized void execute(FsVolumeImpl volume, Runnable task) {
|
synchronized void execute(FsVolumeImpl volume, Runnable task) {
|
||||||
if (executors == null) {
|
try {
|
||||||
throw new RuntimeException("AsyncDiskService is already shutdown");
|
if (executors == null) {
|
||||||
}
|
throw new RuntimeException("AsyncDiskService is already shutdown");
|
||||||
if (volume == null) {
|
}
|
||||||
throw new RuntimeException("A null volume does not have a executor");
|
if (volume == null) {
|
||||||
}
|
throw new RuntimeException("A null volume does not have a executor");
|
||||||
ThreadPoolExecutor executor = executors.get(volume.getStorageID());
|
}
|
||||||
if (executor == null) {
|
ThreadPoolExecutor executor = executors.get(volume.getStorageID());
|
||||||
throw new RuntimeException("Cannot find volume " + volume
|
if (executor == null) {
|
||||||
+ " for execution of task " + task);
|
throw new RuntimeException("Cannot find volume " + volume
|
||||||
} else {
|
+ " for execution of task " + task);
|
||||||
executor.execute(task);
|
} else {
|
||||||
|
executor.execute(task);
|
||||||
|
}
|
||||||
|
} catch (RuntimeException re) {
|
||||||
|
if (task instanceof ReplicaFileDeleteTask) {
|
||||||
|
IOUtils.cleanupWithLogger(null,
|
||||||
|
((ReplicaFileDeleteTask) task).volumeRef);
|
||||||
|
}
|
||||||
|
throw re;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -314,28 +322,31 @@ class FsDatasetAsyncDiskService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
final long blockLength = replicaToDelete.getBlockDataLength();
|
try {
|
||||||
final long metaLength = replicaToDelete.getMetadataLength();
|
final long blockLength = replicaToDelete.getBlockDataLength();
|
||||||
boolean result;
|
final long metaLength = replicaToDelete.getMetadataLength();
|
||||||
|
boolean result;
|
||||||
|
|
||||||
result = (trashDirectory == null) ? deleteFiles() : moveFiles();
|
result = (trashDirectory == null) ? deleteFiles() : moveFiles();
|
||||||
|
|
||||||
if (!result) {
|
if (!result) {
|
||||||
LOG.warn("Unexpected error trying to "
|
LOG.warn("Unexpected error trying to "
|
||||||
+ (trashDirectory == null ? "delete" : "move")
|
+ (trashDirectory == null ? "delete" : "move")
|
||||||
+ " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
|
+ " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
|
||||||
+ " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
|
+ " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
|
||||||
} else {
|
} else {
|
||||||
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
|
if (block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK) {
|
||||||
datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
|
datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
|
||||||
|
}
|
||||||
|
volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
|
||||||
|
volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
|
||||||
|
LOG.info("Deleted " + block.getBlockPoolId() + " " +
|
||||||
|
block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
|
||||||
}
|
}
|
||||||
volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
|
updateDeletedBlockId(block);
|
||||||
volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
|
} finally {
|
||||||
LOG.info("Deleted " + block.getBlockPoolId() + " "
|
IOUtils.cleanupWithLogger(null, this.volumeRef);
|
||||||
+ block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
|
|
||||||
}
|
}
|
||||||
updateDeletedBlockId(block);
|
|
||||||
IOUtils.cleanupWithLogger(null, volumeRef);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -304,7 +304,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
int getReferenceCount() {
|
public int getReferenceCount() {
|
||||||
return this.reference.getReferenceCount();
|
return this.reference.getReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -153,16 +154,24 @@ class RamDiskAsyncLazyPersistService {
|
||||||
* Execute the task sometime in the future, using ThreadPools.
|
* Execute the task sometime in the future, using ThreadPools.
|
||||||
*/
|
*/
|
||||||
synchronized void execute(String storageId, Runnable task) {
|
synchronized void execute(String storageId, Runnable task) {
|
||||||
if (executors == null) {
|
try {
|
||||||
throw new RuntimeException(
|
if (executors == null) {
|
||||||
"AsyncLazyPersistService is already shutdown");
|
throw new RuntimeException(
|
||||||
}
|
"AsyncLazyPersistService is already shutdown");
|
||||||
ThreadPoolExecutor executor = executors.get(storageId);
|
}
|
||||||
if (executor == null) {
|
ThreadPoolExecutor executor = executors.get(storageId);
|
||||||
throw new RuntimeException("Cannot find root storage volume with id " +
|
if (executor == null) {
|
||||||
storageId + " for execution of task " + task);
|
throw new RuntimeException("Cannot find root storage volume with id " +
|
||||||
} else {
|
storageId + " for execution of task " + task);
|
||||||
executor.execute(task);
|
} else {
|
||||||
|
executor.execute(task);
|
||||||
|
}
|
||||||
|
} catch (RuntimeException re) {
|
||||||
|
if (task instanceof ReplicaLazyPersistTask) {
|
||||||
|
IOUtils.cleanupWithLogger(null,
|
||||||
|
((ReplicaLazyPersistTask) task).targetVolume);
|
||||||
|
}
|
||||||
|
throw re;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,9 @@ import java.net.Socket;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -562,4 +565,56 @@ public class TestDataTransferProtocol {
|
||||||
checksum, CachingStrategy.newDefaultStrategy(), false, false,
|
checksum, CachingStrategy.newDefaultStrategy(), false, false,
|
||||||
null, null, new String[0]);
|
null, null, new String[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testReleaseVolumeRefIfExceptionThrown()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Path file = new Path("dataprotocol.dat");
|
||||||
|
int numDataNodes = 1;
|
||||||
|
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
|
||||||
|
numDataNodes).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
datanode = cluster.getFileSystem().getDataNodeStats(
|
||||||
|
DatanodeReportType.LIVE)[0];
|
||||||
|
dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
|
||||||
|
FileSystem fileSys = cluster.getFileSystem();
|
||||||
|
|
||||||
|
int fileLen = Math.min(
|
||||||
|
conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(fileSys, file, fileLen, fileLen,
|
||||||
|
fileSys.getDefaultBlockSize(file),
|
||||||
|
fileSys.getDefaultReplication(file), 0L);
|
||||||
|
|
||||||
|
// Get the first blockid for the file.
|
||||||
|
final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
|
||||||
|
|
||||||
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
|
||||||
|
sendBuf.reset();
|
||||||
|
recvBuf.reset();
|
||||||
|
|
||||||
|
// Delete the meta file to create a exception in BlockSender constructor.
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
cluster.getMaterializedReplica(0, blk).deleteMeta();
|
||||||
|
|
||||||
|
FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset(
|
||||||
|
dn).getVolume(blk);
|
||||||
|
int beforeCnt = volume.getReferenceCount();
|
||||||
|
|
||||||
|
sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN);
|
||||||
|
sendRecvData("Copy a block.", false);
|
||||||
|
Thread.sleep(3000);
|
||||||
|
|
||||||
|
int afterCnt = volume.getReferenceCount();
|
||||||
|
assertEquals(beforeCnt, afterCnt);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||||
|
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -1241,4 +1242,37 @@ public class TestFsDatasetImpl {
|
||||||
assertTrue(blockDir.delete());
|
assertTrue(blockDir.delete());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testReleaseVolumeRefIfExceptionThrown() throws IOException {
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(
|
||||||
|
new HdfsConfiguration()).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
FsVolumeImpl vol = (FsVolumeImpl) dataset.getFsVolumeReferences().get(0);
|
||||||
|
ExtendedBlock eb;
|
||||||
|
ReplicaInfo info;
|
||||||
|
int beforeCnt = 0;
|
||||||
|
try {
|
||||||
|
List<Block> blockList = new ArrayList<Block>();
|
||||||
|
eb = new ExtendedBlock(BLOCKPOOL, 1, 1, 1001);
|
||||||
|
info = new FinalizedReplica(
|
||||||
|
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
|
||||||
|
dataset.volumeMap.add(BLOCKPOOL, info);
|
||||||
|
((LocalReplica) info).getBlockFile().createNewFile();
|
||||||
|
((LocalReplica) info).getMetaFile().createNewFile();
|
||||||
|
blockList.add(info);
|
||||||
|
|
||||||
|
// Create a runtime exception.
|
||||||
|
dataset.asyncDiskService.shutdown();
|
||||||
|
|
||||||
|
beforeCnt = vol.getReferenceCount();
|
||||||
|
dataset.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
|
||||||
|
|
||||||
|
} catch (RuntimeException re) {
|
||||||
|
int afterCnt = vol.getReferenceCount();
|
||||||
|
assertEquals(beforeCnt, afterCnt);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,9 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.ThreadUtil;
|
import org.apache.hadoop.util.ThreadUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -280,4 +283,38 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testReleaseVolumeRefIfExceptionThrown()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
getClusterBuilder().setRamDiskReplicaCapacity(2).build();
|
||||||
|
final String methodName = GenericTestUtils.getMethodName();
|
||||||
|
final int seed = 0xFADED;
|
||||||
|
Path path = new Path("/" + methodName + ".Writer.File.dat");
|
||||||
|
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
FsDatasetSpi.FsVolumeReferences volumes =
|
||||||
|
DataNodeTestUtils.getFSDataset(dn).getFsVolumeReferences();
|
||||||
|
int[] beforeCnts = new int[volumes.size()];
|
||||||
|
FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
|
||||||
|
|
||||||
|
// Create a runtime exception.
|
||||||
|
ds.asyncLazyPersistService.shutdown();
|
||||||
|
for (int i = 0; i < volumes.size(); ++i) {
|
||||||
|
beforeCnts[i] = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
makeRandomTestFile(path, BLOCK_SIZE, true, seed);
|
||||||
|
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||||
|
|
||||||
|
for (int i = 0; i < volumes.size(); ++i) {
|
||||||
|
int afterCnt = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
|
||||||
|
// LazyWriter keeps trying to save copies even if
|
||||||
|
// asyncLazyPersistService is already shutdown.
|
||||||
|
// If we do not release references, the number of
|
||||||
|
// references will increase infinitely.
|
||||||
|
Assert.assertTrue(
|
||||||
|
beforeCnts[i] == afterCnt || beforeCnts[i] == (afterCnt - 1));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue