HDFS-8734. Erasure Coding: fix one cell need two packets. Contributed by Walter Su.

This commit is contained in:
Jing Zhao 2015-07-14 13:10:51 -07:00
parent 6ff957be88
commit 0a93712f3b
4 changed files with 41 additions and 55 deletions

View File

@ -350,3 +350,6 @@
HDFS-8702. Erasure coding: update BlockManager.blockHasEnoughRacks(..) logic
for striped block. (Kai Sasaki via jing9)
HDFS-8734. Erasure Coding: fix one cell need two packets. (Walter Su via
jing9)

View File

@ -419,7 +419,7 @@ public class DFSOutputStream extends FSOutputSummer
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks(1);
currentPacket.incNumChunks();
streamer.incBytesCurBlock(len);
// If packet is full, enqueue it for transmission

View File

@ -259,10 +259,10 @@ public class DFSPacket {
}
/**
* increase the number of chunks by n
* increase the number of chunks by one
*/
synchronized void incNumChunks(int n) {
numChunks += n;
synchronized void incNumChunks() {
numChunks++;
}
/**

View File

@ -254,6 +254,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
private final CellBuffers cellBuffers;
private final RawErasureEncoder encoder;
private final List<StripedDataStreamer> streamers;
private final DFSPacket[] currentPackets; // current Packet of each streamer
/** Size of each striping cell, must be a multiple of bytesPerChecksum */
private final int cellSize;
@ -301,6 +302,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
s.add(streamer);
}
streamers = Collections.unmodifiableList(s);
currentPackets = new DFSPacket[streamers.size()];
setCurrentStreamer(0);
}
@ -316,9 +318,18 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return (StripedDataStreamer)streamer;
}
private synchronized StripedDataStreamer setCurrentStreamer(int i) {
streamer = streamers.get(i);
private synchronized StripedDataStreamer setCurrentStreamer(int newIdx)
throws IOException {
// backup currentPacket for current streamer
int oldIdx = streamers.indexOf(streamer);
if (oldIdx >= 0) {
currentPackets[oldIdx] = currentPacket;
}
streamer = streamers.get(newIdx);
currentPacket = currentPackets[newIdx];
adjustChunkBoundary();
return getCurrentStreamer();
}
@ -366,41 +377,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
currentPacket = null;
}
/**
* Generate packets from a given buffer. This is only used for streamers
* writing parity blocks.
*
* @param byteBuffer the given buffer to generate packets
* @param checksumBuf the checksum buffer
* @return packets generated
* @throws IOException
*/
private List<DFSPacket> generatePackets(
ByteBuffer byteBuffer, byte[] checksumBuf) throws IOException{
List<DFSPacket> packets = new ArrayList<>();
assert byteBuffer.hasArray();
getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0,
byteBuffer.remaining(), checksumBuf, 0);
int ckOff = 0;
while (byteBuffer.remaining() > 0) {
DFSPacket p = createPacket(packetSize, chunksPerPacket,
getCurrentStreamer().getBytesCurBlock(),
getCurrentStreamer().getAndIncCurrentSeqno(), false);
int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
maxBytesToPacket: byteBuffer.remaining();
int chunks = (toWrite - 1) / bytesPerChecksum + 1;
int ckLen = chunks * getChecksumSize();
p.writeChecksum(checksumBuf, ckOff, ckLen);
ckOff += ckLen;
p.writeData(byteBuffer, toWrite);
getCurrentStreamer().incBytesCurBlock(toWrite);
p.incNumChunks(chunks);
packets.add(p);
}
return packets;
}
@Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
@ -413,11 +389,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
if (!current.isFailed()) {
try {
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
// cell is full and current packet has not been enqueued,
if (cellFull && currentPacket != null) {
enqueueCurrentPacketFull();
}
} catch(Exception e) {
handleStreamerFailure("offset=" + offset + ", length=" + len, e);
}
@ -581,10 +552,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
final long oldBytes = current.getBytesCurBlock();
if (!current.isFailed()) {
try {
for (DFSPacket p : generatePackets(buffer, checksumBuf)) {
getCurrentStreamer().waitAndQueuePacket(p);
DataChecksum sum = getDataChecksum();
sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
getChecksumSize());
}
endBlock();
} catch(Exception e) {
handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
}
@ -628,16 +603,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
// flush from all upper layers
try {
flushBuffer();
if (currentPacket != null) {
enqueueCurrentPacket();
}
// if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();
enqueueAllCurrentPackets();
} catch(Exception e) {
handleStreamerFailure("closeImpl", e);
}
// if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
if (!s.isFailed()) {
@ -667,4 +639,15 @@ public class DFSStripedOutputStream extends DFSOutputStream {
setClosed();
}
}
private void enqueueAllCurrentPackets() throws IOException {
int idx = streamers.indexOf(getCurrentStreamer());
for(int i = 0; i < streamers.size(); i++) {
setCurrentStreamer(i);
if (currentPacket != null) {
enqueueCurrentPacket();
}
}
setCurrentStreamer(idx);
}
}