HDFS-10271. Extra bytes are getting released from reservedSpace for append (Contributed by Brahma Reddy Battula)
(cherry picked from commita9a607f8fc
) (cherry picked from commit04621537c9
)
This commit is contained in:
parent
e2b8bdbe0a
commit
b804b20843
|
@ -1162,7 +1162,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
// construct a RBW replica with the new GS
|
// construct a RBW replica with the new GS
|
||||||
File blkfile = replicaInfo.getBlockFile();
|
File blkfile = replicaInfo.getBlockFile();
|
||||||
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
|
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
|
||||||
if (v.getAvailable() < estimateBlockLen - replicaInfo.getNumBytes()) {
|
long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
|
||||||
|
if (v.getAvailable() < bytesReserved) {
|
||||||
throw new DiskOutOfSpaceException("Insufficient space for appending to "
|
throw new DiskOutOfSpaceException("Insufficient space for appending to "
|
||||||
+ replicaInfo);
|
+ replicaInfo);
|
||||||
}
|
}
|
||||||
|
@ -1170,7 +1171,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
File oldmeta = replicaInfo.getMetaFile();
|
File oldmeta = replicaInfo.getMetaFile();
|
||||||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
|
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
|
||||||
replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
|
replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
|
||||||
v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen);
|
v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
|
||||||
File newmeta = newReplicaInfo.getMetaFile();
|
File newmeta = newReplicaInfo.getMetaFile();
|
||||||
|
|
||||||
// rename meta file to rbw directory
|
// rename meta file to rbw directory
|
||||||
|
@ -1206,7 +1207,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
// Replace finalized replica by a RBW replica in replicas map
|
// Replace finalized replica by a RBW replica in replicas map
|
||||||
volumeMap.add(bpid, newReplicaInfo);
|
volumeMap.add(bpid, newReplicaInfo);
|
||||||
v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes());
|
v.reserveSpaceForReplica(bytesReserved);
|
||||||
return newReplicaInfo;
|
return newReplicaInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -572,4 +572,64 @@ public class TestSpaceReservation {
|
||||||
return numFailures;
|
return numFailures;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testReservedSpaceForAppend() throws Exception {
|
||||||
|
final short replication = 3;
|
||||||
|
startCluster(BLOCK_SIZE, replication, -1);
|
||||||
|
final String methodName = GenericTestUtils.getMethodName();
|
||||||
|
final Path file = new Path("/" + methodName + ".01.dat");
|
||||||
|
|
||||||
|
// Write 1 byte to the file and kill the writer.
|
||||||
|
FSDataOutputStream os = fs.create(file, replication);
|
||||||
|
os.write(new byte[1024]);
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
final Path file2 = new Path("/" + methodName + ".02.dat");
|
||||||
|
|
||||||
|
// Write 1 byte to the file and keep it open.
|
||||||
|
FSDataOutputStream os2 = fs.create(file2, replication);
|
||||||
|
os2.write(new byte[1]);
|
||||||
|
os2.hflush();
|
||||||
|
int expectedFile2Reserved = BLOCK_SIZE - 1;
|
||||||
|
checkReservedSpace(expectedFile2Reserved);
|
||||||
|
|
||||||
|
// append one byte and verify reservedspace before and after closing
|
||||||
|
os = fs.append(file);
|
||||||
|
os.write(new byte[1]);
|
||||||
|
os.hflush();
|
||||||
|
int expectedFile1Reserved = BLOCK_SIZE - 1025;
|
||||||
|
checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved);
|
||||||
|
os.close();
|
||||||
|
checkReservedSpace(expectedFile2Reserved);
|
||||||
|
|
||||||
|
// append one byte and verify reservedspace before and after abort
|
||||||
|
os = fs.append(file);
|
||||||
|
os.write(new byte[1]);
|
||||||
|
os.hflush();
|
||||||
|
expectedFile1Reserved--;
|
||||||
|
checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved);
|
||||||
|
DFSTestUtil.abortStream(((DFSOutputStream) os.getWrappedStream()));
|
||||||
|
checkReservedSpace(expectedFile2Reserved);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkReservedSpace(final long expectedReserved) throws TimeoutException,
|
||||||
|
InterruptedException, IOException {
|
||||||
|
for (final DataNode dn : cluster.getDataNodes()) {
|
||||||
|
try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset()
|
||||||
|
.getFsVolumeReferences()) {
|
||||||
|
final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
LOG.info(
|
||||||
|
"dn " + dn.getDisplayName() + " space : " + volume
|
||||||
|
.getReservedForReplicas() + ", Expected ReservedSpace :"
|
||||||
|
+ expectedReserved);
|
||||||
|
return (volume.getReservedForReplicas() == expectedReserved);
|
||||||
|
}
|
||||||
|
}, 100, 3000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue