HDFS-6955. DN should reserve disk space for a full block when creating tmp files (Contributed by Kanaka Kumar Avvaru)
This commit is contained in:
parent
22dee29857
commit
f40b4f4542
|
@ -117,7 +117,7 @@ class BlockReceiver implements Closeable {
|
|||
/** the block to receive */
|
||||
private final ExtendedBlock block;
|
||||
/** the replica to write */
|
||||
private final ReplicaInPipelineInterface replicaInfo;
|
||||
private ReplicaInPipelineInterface replicaInfo;
|
||||
/** pipeline stage */
|
||||
private final BlockConstructionStage stage;
|
||||
private final boolean isTransfer;
|
||||
|
@ -259,6 +259,9 @@ class BlockReceiver implements Closeable {
|
|||
} catch (ReplicaNotFoundException bne) {
|
||||
throw bne;
|
||||
} catch(IOException ioe) {
|
||||
if (replicaInfo != null) {
|
||||
replicaInfo.releaseAllBytesReserved();
|
||||
}
|
||||
IOUtils.closeStream(this);
|
||||
cleanupBlock();
|
||||
|
||||
|
|
|
@ -62,13 +62,13 @@ public interface FsVolumeSpi {
|
|||
boolean isTransientStorage();
|
||||
|
||||
/**
|
||||
* Reserve disk space for an RBW block so a writer does not run out of
|
||||
* space before the block is full.
|
||||
* Reserve disk space for a block (RBW or Re-replicating)
|
||||
* so a writer does not run out of space before the block is full.
|
||||
*/
|
||||
void reserveSpaceForRbw(long bytesToReserve);
|
||||
void reserveSpaceForReplica(long bytesToReserve);
|
||||
|
||||
/**
|
||||
* Release disk space previously reserved for RBW block.
|
||||
* Release disk space previously reserved for block opened for write.
|
||||
*/
|
||||
void releaseReservedSpace(long bytesToRelease);
|
||||
|
||||
|
|
|
@ -1162,7 +1162,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
// Replace finalized replica by a RBW replica in replicas map
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes());
|
||||
v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes());
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
|
@ -1492,7 +1492,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
ReplicaInPipeline newReplicaInfo =
|
||||
new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
|
||||
f.getParentFile(), 0);
|
||||
f.getParentFile(), b.getLocalBlock().getNumBytes());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
return new ReplicaHandler(newReplicaInfo, ref);
|
||||
} else {
|
||||
|
@ -1609,7 +1609,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
|
||||
// remove from volumeMap
|
||||
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
|
||||
|
||||
|
||||
// delete the on-disk temp file
|
||||
if (delBlockFromDisk(replicaInfo.getBlockFile(),
|
||||
replicaInfo.getMetaFile(), b.getLocalBlock())) {
|
||||
|
@ -2559,14 +2559,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final long usedSpace; // size of space used by HDFS
|
||||
final long freeSpace; // size of free space excluding reserved space
|
||||
final long reservedSpace; // size of space reserved for non-HDFS
|
||||
final long reservedSpaceForRBW; // size of space reserved RBW
|
||||
final long reservedSpaceForReplicas; // size of space reserved RBW or
|
||||
// re-replication
|
||||
|
||||
VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
|
||||
this.directory = v.toString();
|
||||
this.usedSpace = usedSpace;
|
||||
this.freeSpace = freeSpace;
|
||||
this.reservedSpace = v.getReserved();
|
||||
this.reservedSpaceForRBW = v.getReservedForRbw();
|
||||
this.reservedSpaceForReplicas = v.getReservedForReplicas();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2600,7 +2601,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
innerInfo.put("usedSpace", v.usedSpace);
|
||||
innerInfo.put("freeSpace", v.freeSpace);
|
||||
innerInfo.put("reservedSpace", v.reservedSpace);
|
||||
innerInfo.put("reservedSpaceForRBW", v.reservedSpaceForRBW);
|
||||
innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
|
||||
info.put(v.directory, innerInfo);
|
||||
}
|
||||
return info;
|
||||
|
|
|
@ -22,8 +22,8 @@ import java.io.File;
|
|||
import java.io.FileOutputStream;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
|
@ -40,9 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DF;
|
||||
|
@ -54,21 +51,24 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.util.CloseableReferenceCount;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.CloseableReferenceCount;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.codehaus.jackson.annotate.JsonProperty;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* The underlying volume used to store replica.
|
||||
*
|
||||
|
@ -90,8 +90,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
private final long reserved;
|
||||
private CloseableReferenceCount reference = new CloseableReferenceCount();
|
||||
|
||||
// Disk space reserved for open blocks.
|
||||
private AtomicLong reservedForRbw;
|
||||
// Disk space reserved for blocks (RBW or Re-replicating) open for write.
|
||||
private AtomicLong reservedForReplicas;
|
||||
private long recentReserved = 0;
|
||||
|
||||
// Capacity configured. This is useful when we want to
|
||||
// limit the visible capacity for tests. If negative, then we just
|
||||
|
@ -113,8 +114,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
this.reserved = conf.getLong(
|
||||
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
|
||||
this.reservedForRbw = new AtomicLong(0L);
|
||||
this.currentDir = currentDir;
|
||||
this.reservedForReplicas = new AtomicLong(0L);
|
||||
this.currentDir = currentDir;
|
||||
File parent = currentDir.getParentFile();
|
||||
this.usage = new DF(parent, conf);
|
||||
this.storageType = storageType;
|
||||
|
@ -353,8 +354,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
*/
|
||||
@Override
|
||||
public long getAvailable() throws IOException {
|
||||
long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get();
|
||||
long available = usage.getAvailable() - reserved - reservedForRbw.get();
|
||||
long remaining = getCapacity() - getDfsUsed() - reservedForReplicas.get();
|
||||
long available = usage.getAvailable() - reserved
|
||||
- reservedForReplicas.get();
|
||||
if (remaining > available) {
|
||||
remaining = available;
|
||||
}
|
||||
|
@ -362,10 +364,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getReservedForRbw() {
|
||||
return reservedForRbw.get();
|
||||
public long getReservedForReplicas() {
|
||||
return reservedForReplicas.get();
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
long getRecentReserved() {
|
||||
return recentReserved;
|
||||
}
|
||||
|
||||
long getReserved(){
|
||||
return reserved;
|
||||
}
|
||||
|
@ -412,13 +419,20 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
*/
|
||||
File createTmpFile(String bpid, Block b) throws IOException {
|
||||
checkReference();
|
||||
return getBlockPoolSlice(bpid).createTmpFile(b);
|
||||
reserveSpaceForReplica(b.getNumBytes());
|
||||
try {
|
||||
return getBlockPoolSlice(bpid).createTmpFile(b);
|
||||
} catch (IOException exception) {
|
||||
releaseReservedSpace(b.getNumBytes());
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reserveSpaceForRbw(long bytesToReserve) {
|
||||
public void reserveSpaceForReplica(long bytesToReserve) {
|
||||
if (bytesToReserve != 0) {
|
||||
reservedForRbw.addAndGet(bytesToReserve);
|
||||
reservedForReplicas.addAndGet(bytesToReserve);
|
||||
recentReserved = bytesToReserve;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -428,14 +442,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
|
||||
long oldReservation, newReservation;
|
||||
do {
|
||||
oldReservation = reservedForRbw.get();
|
||||
oldReservation = reservedForReplicas.get();
|
||||
newReservation = oldReservation - bytesToRelease;
|
||||
if (newReservation < 0) {
|
||||
// Failsafe, this should never occur in practice, but if it does we don't
|
||||
// want to start advertising more space than we have available.
|
||||
// Failsafe, this should never occur in practice, but if it does we
|
||||
// don't want to start advertising more space than we have available.
|
||||
newReservation = 0;
|
||||
}
|
||||
} while (!reservedForRbw.compareAndSet(oldReservation, newReservation));
|
||||
} while (!reservedForReplicas.compareAndSet(oldReservation,
|
||||
newReservation));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -779,7 +794,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
*/
|
||||
File createRbwFile(String bpid, Block b) throws IOException {
|
||||
checkReference();
|
||||
reserveSpaceForRbw(b.getNumBytes());
|
||||
reserveSpaceForReplica(b.getNumBytes());
|
||||
try {
|
||||
return getBlockPoolSlice(bpid).createRbwFile(b);
|
||||
} catch (IOException exception) {
|
||||
|
@ -790,16 +805,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
|
||||
/**
|
||||
*
|
||||
* @param bytesReservedForRbw Space that was reserved during
|
||||
* @param bytesReserved Space that was reserved during
|
||||
* block creation. Now that the block is being finalized we
|
||||
* can free up this space.
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
File addFinalizedBlock(String bpid, Block b,
|
||||
File f, long bytesReservedForRbw)
|
||||
File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved)
|
||||
throws IOException {
|
||||
releaseReservedSpace(bytesReservedForRbw);
|
||||
releaseReservedSpace(bytesReserved);
|
||||
return getBlockPoolSlice(bpid).addBlock(b, f);
|
||||
}
|
||||
|
||||
|
|
|
@ -493,7 +493,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void reserveSpaceForRbw(long bytesToReserve) {
|
||||
public void reserveSpaceForReplica(long bytesToReserve) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -607,7 +607,7 @@ public class TestDirectoryScanner {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void reserveSpaceForRbw(long bytesToReserve) {
|
||||
public void reserveSpaceForReplica(long bytesToReserve) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -75,7 +75,7 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void reserveSpaceForRbw(long bytesToReserve) {
|
||||
public void reserveSpaceForReplica(long bytesToReserve) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,8 +28,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
|||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
|
@ -60,10 +62,10 @@ import javax.management.ObjectName;
|
|||
|
||||
/**
|
||||
* Ensure that the DN reserves disk space equivalent to a full block for
|
||||
* replica being written (RBW).
|
||||
* replica being written (RBW) & Replica being copied from another DN.
|
||||
*/
|
||||
public class TestRbwSpaceReservation {
|
||||
static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
|
||||
public class TestSpaceReservation {
|
||||
static final Log LOG = LogFactory.getLog(TestSpaceReservation.class);
|
||||
|
||||
private static final int DU_REFRESH_INTERVAL_MSEC = 500;
|
||||
private static final int STORAGES_PER_DATANODE = 1;
|
||||
|
@ -165,14 +167,14 @@ public class TestRbwSpaceReservation {
|
|||
int bytesWritten = buffer.length;
|
||||
|
||||
// Check that space was reserved for a full block minus the bytesWritten.
|
||||
assertThat(singletonVolume.getReservedForRbw(),
|
||||
assertThat(singletonVolume.getReservedForReplicas(),
|
||||
is((long) fileBlockSize - bytesWritten));
|
||||
out.close();
|
||||
out = null;
|
||||
|
||||
// Check that the reserved space has been released since we closed the
|
||||
// file.
|
||||
assertThat(singletonVolume.getReservedForRbw(), is(0L));
|
||||
assertThat(singletonVolume.getReservedForReplicas(), is(0L));
|
||||
|
||||
// Reopen the file for appends and write 1 more byte.
|
||||
out = fs.append(path);
|
||||
|
@ -182,7 +184,7 @@ public class TestRbwSpaceReservation {
|
|||
|
||||
// Check that space was again reserved for a full block minus the
|
||||
// bytesWritten so far.
|
||||
assertThat(singletonVolume.getReservedForRbw(),
|
||||
assertThat(singletonVolume.getReservedForReplicas(),
|
||||
is((long) fileBlockSize - bytesWritten));
|
||||
|
||||
// Write once again and again verify the available space. This ensures
|
||||
|
@ -191,7 +193,7 @@ public class TestRbwSpaceReservation {
|
|||
out.write(buffer);
|
||||
out.hsync();
|
||||
bytesWritten += buffer.length;
|
||||
assertThat(singletonVolume.getReservedForRbw(),
|
||||
assertThat(singletonVolume.getReservedForReplicas(),
|
||||
is((long) fileBlockSize - bytesWritten));
|
||||
} finally {
|
||||
if (out != null) {
|
||||
|
@ -282,7 +284,7 @@ public class TestRbwSpaceReservation {
|
|||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return (volume.getReservedForRbw() == 0);
|
||||
return (volume.getReservedForReplicas() == 0);
|
||||
}
|
||||
}, 500, Integer.MAX_VALUE); // Wait until the test times out.
|
||||
}
|
||||
|
@ -324,12 +326,30 @@ public class TestRbwSpaceReservation {
|
|||
}
|
||||
|
||||
// Ensure RBW space reserved is released
|
||||
assertTrue("Expected ZERO but got " + fsVolumeImpl.getReservedForRbw(),
|
||||
fsVolumeImpl.getReservedForRbw() == 0);
|
||||
assertTrue(
|
||||
"Expected ZERO but got " + fsVolumeImpl.getReservedForReplicas(),
|
||||
fsVolumeImpl.getReservedForReplicas() == 0);
|
||||
|
||||
// Reserve some bytes to verify double clearing space should't happen
|
||||
fsVolumeImpl.reserveSpaceForReplica(1000);
|
||||
try {
|
||||
// Write 1 byte to the file
|
||||
FSDataOutputStream os = fs.create(new Path("/" + methodName + ".02.dat"),
|
||||
replication);
|
||||
os.write(new byte[1]);
|
||||
os.hsync();
|
||||
os.close();
|
||||
fail("Expecting IOException file creation failure");
|
||||
} catch (IOException e) {
|
||||
// Exception can be ignored (expected)
|
||||
}
|
||||
|
||||
// Ensure RBW space reserved is released only once
|
||||
assertTrue(fsVolumeImpl.getReservedForReplicas() == 1000);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testRBWInJMXBean() throws Exception {
|
||||
public void testReservedSpaceInJMXBean() throws Exception {
|
||||
|
||||
final short replication = 1;
|
||||
startCluster(BLOCK_SIZE, replication, -1);
|
||||
|
@ -348,7 +368,111 @@ public class TestRbwSpaceReservation {
|
|||
final String volumeInfo = (String) mbs.getAttribute(mxbeanName,
|
||||
"VolumeInfo");
|
||||
|
||||
assertTrue(volumeInfo.contains("reservedSpaceForRBW"));
|
||||
// verify reserved space for Replicas in JMX bean volume info
|
||||
assertTrue(volumeInfo.contains("reservedSpaceForReplicas"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testTmpSpaceReserve() throws Exception {
|
||||
|
||||
final short replication = 2;
|
||||
startCluster(BLOCK_SIZE, replication, -1);
|
||||
final int byteCount1 = 100;
|
||||
final int byteCount2 = 200;
|
||||
|
||||
final String methodName = GenericTestUtils.getMethodName();
|
||||
|
||||
// Test positive scenario
|
||||
{
|
||||
final Path file = new Path("/" + methodName + ".01.dat");
|
||||
|
||||
try (FSDataOutputStream os = fs.create(file, (short) 1)) {
|
||||
// Write test data to the file
|
||||
os.write(new byte[byteCount1]);
|
||||
os.hsync();
|
||||
}
|
||||
|
||||
BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
|
||||
String firstReplicaNode = blockLocations[0].getNames()[0];
|
||||
|
||||
int newReplicaDNIndex = 0;
|
||||
if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
|
||||
.getDisplayName())) {
|
||||
newReplicaDNIndex = 1;
|
||||
}
|
||||
|
||||
FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
|
||||
.get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
|
||||
|
||||
performReReplication(file, true);
|
||||
|
||||
assertEquals("Wrong reserve space for Tmp ", byteCount1,
|
||||
fsVolumeImpl.getRecentReserved());
|
||||
|
||||
assertEquals("Reserved Tmp space is not released", 0,
|
||||
fsVolumeImpl.getReservedForReplicas());
|
||||
}
|
||||
|
||||
// Test when file creation fails
|
||||
{
|
||||
final Path file = new Path("/" + methodName + ".01.dat");
|
||||
|
||||
try (FSDataOutputStream os = fs.create(file, (short) 1)) {
|
||||
// Write test data to the file
|
||||
os.write(new byte[byteCount2]);
|
||||
os.hsync();
|
||||
}
|
||||
|
||||
BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
|
||||
String firstReplicaNode = blockLocations[0].getNames()[0];
|
||||
|
||||
int newReplicaDNIndex = 0;
|
||||
if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
|
||||
.getDisplayName())) {
|
||||
newReplicaDNIndex = 1;
|
||||
}
|
||||
|
||||
BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
|
||||
Mockito.when(blockPoolSlice.createTmpFile((Block) Mockito.any()))
|
||||
.thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
|
||||
|
||||
final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
|
||||
.get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
|
||||
|
||||
// Reserve some bytes to verify double clearing space should't happen
|
||||
fsVolumeImpl.reserveSpaceForReplica(1000);
|
||||
|
||||
Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
|
||||
field.setAccessible(true);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
|
||||
.get(fsVolumeImpl);
|
||||
bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
|
||||
|
||||
performReReplication(file, false);
|
||||
|
||||
assertEquals("Wrong reserve space for Tmp ", byteCount2,
|
||||
fsVolumeImpl.getRecentReserved());
|
||||
|
||||
assertEquals("Tmp space is not released OR released twice", 1000,
|
||||
fsVolumeImpl.getReservedForReplicas());
|
||||
}
|
||||
}
|
||||
|
||||
private void performReReplication(Path filePath, boolean waitForSuccess)
|
||||
throws Exception {
|
||||
fs.setReplication(filePath, (short) 2);
|
||||
|
||||
Thread.sleep(4000);
|
||||
BlockLocation[] blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
|
||||
|
||||
if (waitForSuccess) {
|
||||
// Wait for the re replication
|
||||
while (blockLocations[0].getNames().length < 2) {
|
||||
Thread.sleep(2000);
|
||||
blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -387,7 +511,7 @@ public class TestRbwSpaceReservation {
|
|||
" files and hit " + numFailures + " failures");
|
||||
|
||||
// Check no space was leaked.
|
||||
assertThat(singletonVolume.getReservedForRbw(), is(0L));
|
||||
assertThat(singletonVolume.getReservedForReplicas(), is(0L));
|
||||
}
|
||||
|
||||
private static class Writer extends Daemon {
|
Loading…
Reference in New Issue