From aa45faf0b20c922b0d147ece9fa01fb95a5b0dec Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Thu, 1 Feb 2018 18:03:01 -0800 Subject: [PATCH] HDFS-12942. Synchronization issue in FSDataSetImpl#moveBlock. Contributed by Ajay Kumar. --- .../fsdataset/impl/FsDatasetImpl.java | 95 ++++++++++++---- .../fsdataset/impl/TestFsDatasetImpl.java | 103 +++++++++++++++++- 2 files changed, 178 insertions(+), 20 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index e0f780917b1..8e7884d70a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -971,26 +971,74 @@ class FsDatasetImpl implements FsDatasetSpi { * @throws IOException */ private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo, - FsVolumeReference volumeRef) throws - IOException { + FsVolumeReference volumeRef) throws IOException { + ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo, + volumeRef); + finalizeNewReplica(newReplicaInfo, block); + removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId()); + return newReplicaInfo; + } + /** + * Cleanup the replicaInfo object passed. + * + * @param bpid - block pool id + * @param replicaInfo - ReplicaInfo + */ + private void cleanupReplica(String bpid, ReplicaInfo replicaInfo) { + if (replicaInfo.deleteBlockData() || !replicaInfo.blockDataExists()) { + FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume(); + volume.onBlockFileDeletion(bpid, replicaInfo.getBytesOnDisk()); + if (replicaInfo.deleteMetadata() || !replicaInfo.metadataExists()) { + volume.onMetaFileDeletion(bpid, replicaInfo.getMetadataLength()); + } + } + } + + /** + * Create a new temporary replica of replicaInfo object in specified volume. + * + * @param block - Extended Block + * @param replicaInfo - ReplicaInfo + * @param volumeRef - Volume Ref - Closed by caller. + * @return newReplicaInfo new replica object created in specified volume. + * @throws IOException + */ + @VisibleForTesting + ReplicaInfo copyReplicaToVolume(ExtendedBlock block, ReplicaInfo replicaInfo, + FsVolumeReference volumeRef) throws IOException { FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume(); // Copy files to temp dir first ReplicaInfo newReplicaInfo = targetVolume.moveBlockToTmpLocation(block, replicaInfo, smallBufferSize, conf); - - // Finalize the copied files - newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); - try (AutoCloseableLock lock = datasetLock.acquire()) { - // Increment numBlocks here as this block moved without knowing to BPS - FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); - volume.incrNumBlocks(block.getBlockPoolId()); - } - - removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId()); return newReplicaInfo; } + /** + * Finalizes newReplica by calling finalizeReplica internally. + * + * @param newReplicaInfo - ReplicaInfo + * @param block - Extended Block + * @throws IOException + */ + @VisibleForTesting + void finalizeNewReplica(ReplicaInfo newReplicaInfo, + ExtendedBlock block) throws IOException { + // Finalize the copied files + try { + String bpid = block.getBlockPoolId(); + finalizeReplica(bpid, newReplicaInfo); + FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); + volume.incrNumBlocks(bpid); + } catch (IOException ioe) { + // Cleanup block data and metadata + // Decrement of dfsUsed and noOfBlocks for volume not required + newReplicaInfo.deleteBlockData(); + newReplicaInfo.deleteMetadata(); + throw ioe; + } + } + /** * Moves a given block from one volume to another volume. This is used by disk * balancer. @@ -1664,6 +1712,13 @@ class FsDatasetImpl implements FsDatasetSpi { private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { try (AutoCloseableLock lock = datasetLock.acquire()) { + // Compare generation stamp of old and new replica before finalizing + if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp() + > replicaInfo.getGenerationStamp()) { + throw new IOException("Generation Stamp should be monotonically " + + "increased."); + } + ReplicaInfo newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && replicaInfo.getOriginalReplica().getState() @@ -1689,6 +1744,7 @@ class FsDatasetImpl implements FsDatasetSpi { } assert newReplicaInfo.getState() == ReplicaState.FINALIZED : "Replica should be finalized"; + volumeMap.add(bpid, newReplicaInfo); return newReplicaInfo; } @@ -2940,6 +2996,13 @@ class FsDatasetImpl implements FsDatasetSpi { } } + /** + * Cleanup the old replica and notifies the NN about new replica. + * + * @param replicaInfo - Old replica to be deleted + * @param newReplicaInfo - New replica object + * @param bpid - block pool id + */ private void removeOldReplica(ReplicaInfo replicaInfo, ReplicaInfo newReplicaInfo, final String bpid) { // Before deleting the files from old storage we must notify the @@ -2958,13 +3021,7 @@ class FsDatasetImpl implements FsDatasetSpi { newReplicaInfo.isOnTransientStorage()); // Remove the old replicas - if (replicaInfo.deleteBlockData() || !replicaInfo.blockDataExists()) { - FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume(); - volume.onBlockFileDeletion(bpid, replicaInfo.getBytesOnDisk()); - if (replicaInfo.deleteMetadata() || !replicaInfo.metadataExists()) { - volume.onMetaFileDeletion(bpid, replicaInfo.getMetadataLength()); - } - } + cleanupReplica(bpid, replicaInfo); // If deletion failed then the directory scanner will cleanup the blocks // eventually. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index cfae1e24320..3c4b657983a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.StringUtils; import org.junit.Assert; @@ -747,4 +748,104 @@ public class TestFsDatasetImpl { cluster.shutdown(); } } -} + + @Test(timeout = 30000) + public void testMoveBlockFailure() { + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) + .storagesPerDatanode(2) + .build(); + FileSystem fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + + Path filePath = new Path("testData"); + DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0); + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); + + FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); + ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl); + + // Append to file to update its GS + FSDataOutputStream out = fs.append(filePath, (short) 1); + out.write(100); + out.hflush(); + + // Call finalizeNewReplica + LOG.info("GenerationStamp of old replica: {}", + block.getGenerationStamp()); + LOG.info("GenerationStamp of new replica: {}", fsDataSetImpl + .getReplicaInfo(block.getBlockPoolId(), newReplicaInfo.getBlockId()) + .getGenerationStamp()); + LambdaTestUtils.intercept(IOException.class, "Generation Stamp " + + "should be monotonically increased.", + () -> fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block)); + } catch (Exception ex) { + LOG.info("Exception in testMoveBlockFailure ", ex); + fail("Exception while testing testMoveBlockFailure "); + } finally { + if (cluster.isClusterUp()) { + cluster.shutdown(); + } + } + } + + @Test(timeout = 30000) + public void testMoveBlockSuccess() { + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) + .storagesPerDatanode(2) + .build(); + FileSystem fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + + Path filePath = new Path("testData"); + DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0); + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); + + FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); + ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl); + fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block); + + } catch (Exception ex) { + LOG.info("Exception in testMoveBlockSuccess ", ex); + fail("MoveBlock operation should succeed"); + } finally { + if (cluster.isClusterUp()) { + cluster.shutdown(); + } + } + } + + /** + * Create a new temporary replica of replicaInfo object in another volume. + * + * @param block - Extended Block + * @param fsDataSetImpl - FsDatasetImpl reference + * @throws IOException + */ + private ReplicaInfo createNewReplicaObj(ExtendedBlock block, FsDatasetImpl + fsDataSetImpl) throws IOException { + ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block); + FsVolumeSpi destVolume = null; + + final String srcStorageId = fsDataSetImpl.getVolume(block).getStorageID(); + try (FsVolumeReferences volumeReferences = + fsDataSetImpl.getFsVolumeReferences()) { + for (int i = 0; i < volumeReferences.size(); i++) { + if (!volumeReferences.get(i).getStorageID().equals(srcStorageId)) { + destVolume = volumeReferences.get(i); + break; + } + } + } + return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo, + destVolume.obtainReference()); + } + +} \ No newline at end of file