From f40b4f45425f509fceef222e5cacd946bfcc9751 Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Fri, 18 Sep 2015 16:42:22 +0530 Subject: [PATCH] HDFS-6955. DN should reserve disk space for a full block when creating tmp files (Contributed by Kanaka Kumar Avvaru) --- .../hdfs/server/datanode/BlockReceiver.java | 5 +- .../datanode/fsdataset/FsVolumeSpi.java | 8 +- .../fsdataset/impl/FsDatasetImpl.java | 13 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 72 +++++---- .../server/datanode/SimulatedFSDataset.java | 2 +- .../server/datanode/TestDirectoryScanner.java | 2 +- .../extdataset/ExternalVolumeImpl.java | 2 +- ...rvation.java => TestSpaceReservation.java} | 150 ++++++++++++++++-- 8 files changed, 198 insertions(+), 56 deletions(-) rename hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/{TestRbwSpaceReservation.java => TestSpaceReservation.java} (73%) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index bc5396f94d8..957b2c72f42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -117,7 +117,7 @@ class BlockReceiver implements Closeable { /** the block to receive */ private final ExtendedBlock block; /** the replica to write */ - private final ReplicaInPipelineInterface replicaInfo; + private ReplicaInPipelineInterface replicaInfo; /** pipeline stage */ private final BlockConstructionStage stage; private final boolean isTransfer; @@ -259,6 +259,9 @@ class BlockReceiver implements Closeable { } catch (ReplicaNotFoundException bne) { throw bne; } catch(IOException ioe) { + if (replicaInfo != null) { + replicaInfo.releaseAllBytesReserved(); + } IOUtils.closeStream(this); cleanupBlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index ee0192404ce..9e161214c86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -62,13 +62,13 @@ public interface FsVolumeSpi { boolean isTransientStorage(); /** - * Reserve disk space for an RBW block so a writer does not run out of - * space before the block is full. + * Reserve disk space for a block (RBW or Re-replicating) + * so a writer does not run out of space before the block is full. */ - void reserveSpaceForRbw(long bytesToReserve); + void reserveSpaceForReplica(long bytesToReserve); /** - * Release disk space previously reserved for RBW block. + * Release disk space previously reserved for block opened for write. */ void releaseReservedSpace(long bytesToRelease); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 7c77ad8a4d0..0d931c40561 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1162,7 +1162,7 @@ class FsDatasetImpl implements FsDatasetSpi { // Replace finalized replica by a RBW replica in replicas map volumeMap.add(bpid, newReplicaInfo); - v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes()); + v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes()); return newReplicaInfo; } @@ -1492,7 +1492,7 @@ class FsDatasetImpl implements FsDatasetSpi { } ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, - f.getParentFile(), 0); + f.getParentFile(), b.getLocalBlock().getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); } else { @@ -1609,7 +1609,7 @@ class FsDatasetImpl implements FsDatasetSpi { if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); - + // delete the on-disk temp file if (delBlockFromDisk(replicaInfo.getBlockFile(), replicaInfo.getMetaFile(), b.getLocalBlock())) { @@ -2559,14 +2559,15 @@ class FsDatasetImpl implements FsDatasetSpi { final long usedSpace; // size of space used by HDFS final long freeSpace; // size of free space excluding reserved space final long reservedSpace; // size of space reserved for non-HDFS - final long reservedSpaceForRBW; // size of space reserved RBW + final long reservedSpaceForReplicas; // size of space reserved RBW or + // re-replication VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) { this.directory = v.toString(); this.usedSpace = usedSpace; this.freeSpace = freeSpace; this.reservedSpace = v.getReserved(); - this.reservedSpaceForRBW = v.getReservedForRbw(); + this.reservedSpaceForReplicas = v.getReservedForReplicas(); } } @@ -2600,7 +2601,7 @@ class FsDatasetImpl implements FsDatasetSpi { innerInfo.put("usedSpace", v.usedSpace); innerInfo.put("freeSpace", v.freeSpace); innerInfo.put("reservedSpace", v.reservedSpace); - innerInfo.put("reservedSpaceForRBW", v.reservedSpaceForRBW); + innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas); info.put(v.directory, innerInfo); } return info; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 182eed40647..126f6672758 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -22,8 +22,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; -import java.nio.channels.ClosedChannelException; import java.io.OutputStreamWriter; +import java.nio.channels.ClosedChannelException; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; @@ -40,9 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; @@ -54,21 +51,24 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.util.DiskChecker.DiskErrorException; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.util.Time; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * The underlying volume used to store replica. * @@ -90,8 +90,9 @@ public class FsVolumeImpl implements FsVolumeSpi { private final long reserved; private CloseableReferenceCount reference = new CloseableReferenceCount(); - // Disk space reserved for open blocks. - private AtomicLong reservedForRbw; + // Disk space reserved for blocks (RBW or Re-replicating) open for write. + private AtomicLong reservedForReplicas; + private long recentReserved = 0; // Capacity configured. This is useful when we want to // limit the visible capacity for tests. If negative, then we just @@ -113,8 +114,8 @@ public class FsVolumeImpl implements FsVolumeSpi { this.reserved = conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); - this.reservedForRbw = new AtomicLong(0L); - this.currentDir = currentDir; + this.reservedForReplicas = new AtomicLong(0L); + this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); this.storageType = storageType; @@ -353,8 +354,9 @@ public class FsVolumeImpl implements FsVolumeSpi { */ @Override public long getAvailable() throws IOException { - long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get(); - long available = usage.getAvailable() - reserved - reservedForRbw.get(); + long remaining = getCapacity() - getDfsUsed() - reservedForReplicas.get(); + long available = usage.getAvailable() - reserved + - reservedForReplicas.get(); if (remaining > available) { remaining = available; } @@ -362,10 +364,15 @@ public class FsVolumeImpl implements FsVolumeSpi { } @VisibleForTesting - public long getReservedForRbw() { - return reservedForRbw.get(); + public long getReservedForReplicas() { + return reservedForReplicas.get(); } - + + @VisibleForTesting + long getRecentReserved() { + return recentReserved; + } + long getReserved(){ return reserved; } @@ -412,13 +419,20 @@ public class FsVolumeImpl implements FsVolumeSpi { */ File createTmpFile(String bpid, Block b) throws IOException { checkReference(); - return getBlockPoolSlice(bpid).createTmpFile(b); + reserveSpaceForReplica(b.getNumBytes()); + try { + return getBlockPoolSlice(bpid).createTmpFile(b); + } catch (IOException exception) { + releaseReservedSpace(b.getNumBytes()); + throw exception; + } } @Override - public void reserveSpaceForRbw(long bytesToReserve) { + public void reserveSpaceForReplica(long bytesToReserve) { if (bytesToReserve != 0) { - reservedForRbw.addAndGet(bytesToReserve); + reservedForReplicas.addAndGet(bytesToReserve); + recentReserved = bytesToReserve; } } @@ -428,14 +442,15 @@ public class FsVolumeImpl implements FsVolumeSpi { long oldReservation, newReservation; do { - oldReservation = reservedForRbw.get(); + oldReservation = reservedForReplicas.get(); newReservation = oldReservation - bytesToRelease; if (newReservation < 0) { - // Failsafe, this should never occur in practice, but if it does we don't - // want to start advertising more space than we have available. + // Failsafe, this should never occur in practice, but if it does we + // don't want to start advertising more space than we have available. newReservation = 0; } - } while (!reservedForRbw.compareAndSet(oldReservation, newReservation)); + } while (!reservedForReplicas.compareAndSet(oldReservation, + newReservation)); } } @@ -779,7 +794,7 @@ public class FsVolumeImpl implements FsVolumeSpi { */ File createRbwFile(String bpid, Block b) throws IOException { checkReference(); - reserveSpaceForRbw(b.getNumBytes()); + reserveSpaceForReplica(b.getNumBytes()); try { return getBlockPoolSlice(bpid).createRbwFile(b); } catch (IOException exception) { @@ -790,16 +805,15 @@ public class FsVolumeImpl implements FsVolumeSpi { /** * - * @param bytesReservedForRbw Space that was reserved during + * @param bytesReserved Space that was reserved during * block creation. Now that the block is being finalized we * can free up this space. * @return * @throws IOException */ - File addFinalizedBlock(String bpid, Block b, - File f, long bytesReservedForRbw) + File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved) throws IOException { - releaseReservedSpace(bytesReservedForRbw); + releaseReservedSpace(bytesReserved); return getBlockPoolSlice(bpid).addBlock(b, f); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 6e5f07ca0e0..4de1be7e307 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -493,7 +493,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override - public void reserveSpaceForRbw(long bytesToReserve) { + public void reserveSpaceForReplica(long bytesToReserve) { } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 68152fbfb39..b6ecb81e6c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -607,7 +607,7 @@ public class TestDirectoryScanner { } @Override - public void reserveSpaceForRbw(long bytesToReserve) { + public void reserveSpaceForReplica(long bytesToReserve) { } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 8f6eb5a1507..8af54db4cf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -75,7 +75,7 @@ public class ExternalVolumeImpl implements FsVolumeSpi { } @Override - public void reserveSpaceForRbw(long bytesToReserve) { + public void reserveSpaceForReplica(long bytesToReserve) { } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java similarity index 73% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java index a647d9660ca..c4942888eca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java @@ -28,8 +28,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.*; @@ -60,10 +62,10 @@ import javax.management.ObjectName; /** * Ensure that the DN reserves disk space equivalent to a full block for - * replica being written (RBW). + * replica being written (RBW) & Replica being copied from another DN. */ -public class TestRbwSpaceReservation { - static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class); +public class TestSpaceReservation { + static final Log LOG = LogFactory.getLog(TestSpaceReservation.class); private static final int DU_REFRESH_INTERVAL_MSEC = 500; private static final int STORAGES_PER_DATANODE = 1; @@ -165,14 +167,14 @@ public class TestRbwSpaceReservation { int bytesWritten = buffer.length; // Check that space was reserved for a full block minus the bytesWritten. - assertThat(singletonVolume.getReservedForRbw(), + assertThat(singletonVolume.getReservedForReplicas(), is((long) fileBlockSize - bytesWritten)); out.close(); out = null; // Check that the reserved space has been released since we closed the // file. - assertThat(singletonVolume.getReservedForRbw(), is(0L)); + assertThat(singletonVolume.getReservedForReplicas(), is(0L)); // Reopen the file for appends and write 1 more byte. out = fs.append(path); @@ -182,7 +184,7 @@ public class TestRbwSpaceReservation { // Check that space was again reserved for a full block minus the // bytesWritten so far. - assertThat(singletonVolume.getReservedForRbw(), + assertThat(singletonVolume.getReservedForReplicas(), is((long) fileBlockSize - bytesWritten)); // Write once again and again verify the available space. This ensures @@ -191,7 +193,7 @@ public class TestRbwSpaceReservation { out.write(buffer); out.hsync(); bytesWritten += buffer.length; - assertThat(singletonVolume.getReservedForRbw(), + assertThat(singletonVolume.getReservedForReplicas(), is((long) fileBlockSize - bytesWritten)); } finally { if (out != null) { @@ -282,7 +284,7 @@ public class TestRbwSpaceReservation { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - return (volume.getReservedForRbw() == 0); + return (volume.getReservedForReplicas() == 0); } }, 500, Integer.MAX_VALUE); // Wait until the test times out. } @@ -324,12 +326,30 @@ public class TestRbwSpaceReservation { } // Ensure RBW space reserved is released - assertTrue("Expected ZERO but got " + fsVolumeImpl.getReservedForRbw(), - fsVolumeImpl.getReservedForRbw() == 0); + assertTrue( + "Expected ZERO but got " + fsVolumeImpl.getReservedForReplicas(), + fsVolumeImpl.getReservedForReplicas() == 0); + + // Reserve some bytes to verify double clearing space should't happen + fsVolumeImpl.reserveSpaceForReplica(1000); + try { + // Write 1 byte to the file + FSDataOutputStream os = fs.create(new Path("/" + methodName + ".02.dat"), + replication); + os.write(new byte[1]); + os.hsync(); + os.close(); + fail("Expecting IOException file creation failure"); + } catch (IOException e) { + // Exception can be ignored (expected) + } + + // Ensure RBW space reserved is released only once + assertTrue(fsVolumeImpl.getReservedForReplicas() == 1000); } @Test(timeout = 30000) - public void testRBWInJMXBean() throws Exception { + public void testReservedSpaceInJMXBean() throws Exception { final short replication = 1; startCluster(BLOCK_SIZE, replication, -1); @@ -348,7 +368,111 @@ public class TestRbwSpaceReservation { final String volumeInfo = (String) mbs.getAttribute(mxbeanName, "VolumeInfo"); - assertTrue(volumeInfo.contains("reservedSpaceForRBW")); + // verify reserved space for Replicas in JMX bean volume info + assertTrue(volumeInfo.contains("reservedSpaceForReplicas")); + } + } + + @Test(timeout = 300000) + public void testTmpSpaceReserve() throws Exception { + + final short replication = 2; + startCluster(BLOCK_SIZE, replication, -1); + final int byteCount1 = 100; + final int byteCount2 = 200; + + final String methodName = GenericTestUtils.getMethodName(); + + // Test positive scenario + { + final Path file = new Path("/" + methodName + ".01.dat"); + + try (FSDataOutputStream os = fs.create(file, (short) 1)) { + // Write test data to the file + os.write(new byte[byteCount1]); + os.hsync(); + } + + BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10); + String firstReplicaNode = blockLocations[0].getNames()[0]; + + int newReplicaDNIndex = 0; + if (firstReplicaNode.equals(cluster.getDataNodes().get(0) + .getDisplayName())) { + newReplicaDNIndex = 1; + } + + FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes() + .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0); + + performReReplication(file, true); + + assertEquals("Wrong reserve space for Tmp ", byteCount1, + fsVolumeImpl.getRecentReserved()); + + assertEquals("Reserved Tmp space is not released", 0, + fsVolumeImpl.getReservedForReplicas()); + } + + // Test when file creation fails + { + final Path file = new Path("/" + methodName + ".01.dat"); + + try (FSDataOutputStream os = fs.create(file, (short) 1)) { + // Write test data to the file + os.write(new byte[byteCount2]); + os.hsync(); + } + + BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10); + String firstReplicaNode = blockLocations[0].getNames()[0]; + + int newReplicaDNIndex = 0; + if (firstReplicaNode.equals(cluster.getDataNodes().get(0) + .getDisplayName())) { + newReplicaDNIndex = 1; + } + + BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class); + Mockito.when(blockPoolSlice.createTmpFile((Block) Mockito.any())) + .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK")); + + final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes() + .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0); + + // Reserve some bytes to verify double clearing space should't happen + fsVolumeImpl.reserveSpaceForReplica(1000); + + Field field = FsVolumeImpl.class.getDeclaredField("bpSlices"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + Map bpSlices = (Map) field + .get(fsVolumeImpl); + bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice); + + performReReplication(file, false); + + assertEquals("Wrong reserve space for Tmp ", byteCount2, + fsVolumeImpl.getRecentReserved()); + + assertEquals("Tmp space is not released OR released twice", 1000, + fsVolumeImpl.getReservedForReplicas()); + } + } + + private void performReReplication(Path filePath, boolean waitForSuccess) + throws Exception { + fs.setReplication(filePath, (short) 2); + + Thread.sleep(4000); + BlockLocation[] blockLocations = fs.getFileBlockLocations(filePath, 0, 10); + + if (waitForSuccess) { + // Wait for the re replication + while (blockLocations[0].getNames().length < 2) { + Thread.sleep(2000); + blockLocations = fs.getFileBlockLocations(filePath, 0, 10); + } } } @@ -387,7 +511,7 @@ public class TestRbwSpaceReservation { " files and hit " + numFailures + " failures"); // Check no space was leaked. - assertThat(singletonVolume.getReservedForRbw(), is(0L)); + assertThat(singletonVolume.getReservedForReplicas(), is(0L)); } private static class Writer extends Daemon {