HDFS-11541. Call RawErasureEncoder and RawErasureDecoder release() methods. Contributed by SammiChen.
This commit is contained in:
parent
0e6f8e4bc6
commit
84d787b9d5
|
@ -177,14 +177,18 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
super.close();
|
||||
if (curStripeBuf != null) {
|
||||
BUFFER_POOL.putBuffer(curStripeBuf);
|
||||
curStripeBuf = null;
|
||||
}
|
||||
if (parityBuf != null) {
|
||||
BUFFER_POOL.putBuffer(parityBuf);
|
||||
parityBuf = null;
|
||||
try {
|
||||
super.close();
|
||||
} finally {
|
||||
if (curStripeBuf != null) {
|
||||
BUFFER_POOL.putBuffer(curStripeBuf);
|
||||
curStripeBuf = null;
|
||||
}
|
||||
if (parityBuf != null) {
|
||||
BUFFER_POOL.putBuffer(parityBuf);
|
||||
parityBuf = null;
|
||||
}
|
||||
decoder.release();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1033,6 +1033,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|||
setClosed();
|
||||
// shutdown executor of flushAll tasks
|
||||
flushAllExecutor.shutdownNow();
|
||||
encoder.release();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -75,29 +75,33 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
|
|||
public void reconstruct() throws IOException {
|
||||
MessageDigest digester = MD5Hash.getDigester();
|
||||
long maxTargetLength = getMaxTargetLength();
|
||||
while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) {
|
||||
long remaining = maxTargetLength - getPositionInBlock();
|
||||
final int toReconstructLen = (int) Math
|
||||
.min(getStripedReader().getBufferSize(), remaining);
|
||||
// step1: read from minimum source DNs required for reconstruction.
|
||||
// The returned success list is the source DNs we do real read from
|
||||
getStripedReader().readMinimumSources(toReconstructLen);
|
||||
try {
|
||||
while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) {
|
||||
long remaining = maxTargetLength - getPositionInBlock();
|
||||
final int toReconstructLen = (int) Math
|
||||
.min(getStripedReader().getBufferSize(), remaining);
|
||||
// step1: read from minimum source DNs required for reconstruction.
|
||||
// The returned success list is the source DNs we do real read from
|
||||
getStripedReader().readMinimumSources(toReconstructLen);
|
||||
|
||||
// step2: decode to reconstruct targets
|
||||
reconstructTargets(toReconstructLen);
|
||||
// step2: decode to reconstruct targets
|
||||
reconstructTargets(toReconstructLen);
|
||||
|
||||
// step3: calculate checksum
|
||||
checksumDataLen += checksumWithTargetOutput(targetBuffer.array(),
|
||||
toReconstructLen, digester);
|
||||
// step3: calculate checksum
|
||||
checksumDataLen += checksumWithTargetOutput(targetBuffer.array(),
|
||||
toReconstructLen, digester);
|
||||
|
||||
updatePositionInBlock(toReconstructLen);
|
||||
requestedLen -= toReconstructLen;
|
||||
clearBuffers();
|
||||
updatePositionInBlock(toReconstructLen);
|
||||
requestedLen -= toReconstructLen;
|
||||
clearBuffers();
|
||||
}
|
||||
|
||||
byte[] digest = digester.digest();
|
||||
md5 = new MD5Hash(digest);
|
||||
md5.write(checksumWriter);
|
||||
} finally {
|
||||
cleanup();
|
||||
}
|
||||
|
||||
byte[] digest = digester.digest();
|
||||
md5 = new MD5Hash(digest);
|
||||
md5.write(checksumWriter);
|
||||
}
|
||||
|
||||
private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen,
|
||||
|
|
|
@ -74,6 +74,7 @@ class StripedBlockReconstructor extends StripedReconstructor
|
|||
metrics.incrECReconstructionBytesWritten(getBytesWritten());
|
||||
getStripedReader().close();
|
||||
stripedWriter.close();
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -253,6 +253,12 @@ abstract class StripedReconstructor {
|
|||
return decoder;
|
||||
}
|
||||
|
||||
void cleanup() {
|
||||
if (decoder != null) {
|
||||
decoder.release();
|
||||
}
|
||||
}
|
||||
|
||||
StripedReader getStripedReader() {
|
||||
return stripedReader;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue