HDFS-10271. Extra bytes are getting released from reservedSpace for append (Contributed by Brahma Reddy Battula)
(cherry picked from commit a9a607f8fc0d996af3fb37f7efa7591d6655900d)
This commit is contained in:
parent
8f968171bd
commit
04621537c9
@ -1164,7 +1164,8 @@ private synchronized ReplicaBeingWritten append(String bpid,
|
|||||||
// construct a RBW replica with the new GS
|
// construct a RBW replica with the new GS
|
||||||
File blkfile = replicaInfo.getBlockFile();
|
File blkfile = replicaInfo.getBlockFile();
|
||||||
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
|
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
|
||||||
if (v.getAvailable() < estimateBlockLen - replicaInfo.getNumBytes()) {
|
long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
|
||||||
|
if (v.getAvailable() < bytesReserved) {
|
||||||
throw new DiskOutOfSpaceException("Insufficient space for appending to "
|
throw new DiskOutOfSpaceException("Insufficient space for appending to "
|
||||||
+ replicaInfo);
|
+ replicaInfo);
|
||||||
}
|
}
|
||||||
@ -1172,7 +1173,7 @@ private synchronized ReplicaBeingWritten append(String bpid,
|
|||||||
File oldmeta = replicaInfo.getMetaFile();
|
File oldmeta = replicaInfo.getMetaFile();
|
||||||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
|
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
|
||||||
replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
|
replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
|
||||||
v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen);
|
v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
|
||||||
File newmeta = newReplicaInfo.getMetaFile();
|
File newmeta = newReplicaInfo.getMetaFile();
|
||||||
|
|
||||||
// rename meta file to rbw directory
|
// rename meta file to rbw directory
|
||||||
@ -1208,7 +1209,7 @@ private synchronized ReplicaBeingWritten append(String bpid,
|
|||||||
|
|
||||||
// Replace finalized replica by a RBW replica in replicas map
|
// Replace finalized replica by a RBW replica in replicas map
|
||||||
volumeMap.add(bpid, newReplicaInfo);
|
volumeMap.add(bpid, newReplicaInfo);
|
||||||
v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes());
|
v.reserveSpaceForReplica(bytesReserved);
|
||||||
return newReplicaInfo;
|
return newReplicaInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -572,4 +572,64 @@ public int getNumFailures() {
|
|||||||
return numFailures;
|
return numFailures;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testReservedSpaceForAppend() throws Exception {
|
||||||
|
final short replication = 3;
|
||||||
|
startCluster(BLOCK_SIZE, replication, -1);
|
||||||
|
final String methodName = GenericTestUtils.getMethodName();
|
||||||
|
final Path file = new Path("/" + methodName + ".01.dat");
|
||||||
|
|
||||||
|
// Write 1 byte to the file and kill the writer.
|
||||||
|
FSDataOutputStream os = fs.create(file, replication);
|
||||||
|
os.write(new byte[1024]);
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
final Path file2 = new Path("/" + methodName + ".02.dat");
|
||||||
|
|
||||||
|
// Write 1 byte to the file and keep it open.
|
||||||
|
FSDataOutputStream os2 = fs.create(file2, replication);
|
||||||
|
os2.write(new byte[1]);
|
||||||
|
os2.hflush();
|
||||||
|
int expectedFile2Reserved = BLOCK_SIZE - 1;
|
||||||
|
checkReservedSpace(expectedFile2Reserved);
|
||||||
|
|
||||||
|
// append one byte and verify reservedspace before and after closing
|
||||||
|
os = fs.append(file);
|
||||||
|
os.write(new byte[1]);
|
||||||
|
os.hflush();
|
||||||
|
int expectedFile1Reserved = BLOCK_SIZE - 1025;
|
||||||
|
checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved);
|
||||||
|
os.close();
|
||||||
|
checkReservedSpace(expectedFile2Reserved);
|
||||||
|
|
||||||
|
// append one byte and verify reservedspace before and after abort
|
||||||
|
os = fs.append(file);
|
||||||
|
os.write(new byte[1]);
|
||||||
|
os.hflush();
|
||||||
|
expectedFile1Reserved--;
|
||||||
|
checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved);
|
||||||
|
DFSTestUtil.abortStream(((DFSOutputStream) os.getWrappedStream()));
|
||||||
|
checkReservedSpace(expectedFile2Reserved);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkReservedSpace(final long expectedReserved) throws TimeoutException,
|
||||||
|
InterruptedException, IOException {
|
||||||
|
for (final DataNode dn : cluster.getDataNodes()) {
|
||||||
|
try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset()
|
||||||
|
.getFsVolumeReferences()) {
|
||||||
|
final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
LOG.info(
|
||||||
|
"dn " + dn.getDisplayName() + " space : " + volume
|
||||||
|
.getReservedForReplicas() + ", Expected ReservedSpace :"
|
||||||
|
+ expectedReserved);
|
||||||
|
return (volume.getReservedForReplicas() == expectedReserved);
|
||||||
|
}
|
||||||
|
}, 100, 3000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user