HDFS-9494. Parallel optimization of DFSStripedOutputStream#flushAllInternals. Contributed by Gao Rui.

This commit is contained in:
Jing Zhao 2016-02-01 13:02:58 -08:00
parent e50aa53eed
commit e30ce01ddc
3 changed files with 79 additions and 25 deletions

View File

@ -685,18 +685,7 @@ public class DFSOutputStream extends FSOutputSummer
* received from datanodes.
*/
protected void flushInternal() throws IOException {
long toWaitFor;
synchronized (this) {
dfsClient.checkOpen();
checkClosed();
//
// If there is data in the current buffer, send it across
//
getStreamer().queuePacket(currentPacket);
currentPacket = null;
toWaitFor = getStreamer().getLastQueuedSeqno();
}
long toWaitFor = flushInternalWithoutWaitingAck();
getStreamer().waitForAckedSeqno(toWaitFor);
}
@ -864,6 +853,21 @@ public class DFSOutputStream extends FSOutputSummer
return getStreamer().getBlockToken();
}
protected long flushInternalWithoutWaitingAck() throws IOException {
long toWaitFor;
synchronized (this) {
dfsClient.checkOpen();
checkClosed();
//
// If there is data in the current buffer, send it across
//
getStreamer().queuePacket(currentPacket);
currentPacket = null;
toWaitFor = getStreamer().getLastQueuedSeqno();
}
return toWaitFor;
}
@Override
public void setDropBehind(Boolean dropBehind) throws IOException {
CachingStrategy prevStrategy, nextStrategy;

View File

@ -34,6 +34,13 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.HadoopIllegalArgumentException;
@ -245,7 +252,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
private final List<StripedDataStreamer> streamers;
private final DFSPacket[] currentPackets; // current Packet of each streamer
/** Size of each striping cell, must be a multiple of bytesPerChecksum */
// Size of each striping cell, must be a multiple of bytesPerChecksum.
private final int cellSize;
private final int numAllBlocks;
private final int numDataBlocks;
@ -253,6 +260,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
private final String[] favoredNodes;
private final List<StripedDataStreamer> failedStreamers;
private final Map<Integer, Integer> corruptBlockCountMap;
private ExecutorService flushAllExecutor;
private CompletionService<Void> flushAllExecutorCompletionService;
private int blockGroupIndex;
/** Construct a new output stream for creating a file. */
@ -273,6 +282,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
this.favoredNodes = favoredNodes;
failedStreamers = new ArrayList<>();
corruptBlockCountMap = new LinkedHashMap<>();
flushAllExecutor = Executors.newFixedThreadPool(numAllBlocks);
flushAllExecutorCompletionService = new
ExecutorCompletionService<>(flushAllExecutor);
encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
numDataBlocks, numParityBlocks);
@ -368,13 +380,19 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return newFailed;
}
private void handleStreamerFailure(String err, Exception e)
private void handleCurrentStreamerFailure(String err, Exception e)
throws IOException {
LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().getErrorState().setInternalError();
getCurrentStreamer().close(true);
checkStreamers();
currentPacket = null;
handleStreamerFailure(err, e, getCurrentStreamer());
}
private void handleStreamerFailure(String err, Exception e,
StripedDataStreamer streamer) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e);
streamer.getErrorState().setInternalError();
streamer.close(true);
checkStreamers();
currentPackets[streamer.getIndex()] = null;
}
private void replaceFailedStreamers() {
@ -495,7 +513,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
try {
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
} catch(Exception e) {
handleStreamerFailure("offset=" + offset + ", length=" + len, e);
handleCurrentStreamerFailure("offset=" + offset + ", length=" + len, e);
}
}
@ -804,7 +822,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
streamer.closeSocket();
} catch (Exception e) {
try {
handleStreamerFailure("force=" + force, e);
handleCurrentStreamerFailure("force=" + force, e);
} catch (IOException ioe) {
b.add(ioe);
}
@ -894,7 +912,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
getChecksumSize());
}
} catch(Exception e) {
handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
handleCurrentStreamerFailure("oldBytes=" + oldBytes + ", len=" + len,
e);
}
}
}
@ -968,6 +987,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
} catch (ClosedChannelException ignored) {
} finally {
setClosed();
// shutdown executor of flushAll tasks
flushAllExecutor.shutdownNow();
}
}
@ -980,7 +1001,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
try {
enqueueCurrentPacket();
} catch (IOException e) {
handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e);
handleCurrentStreamerFailure("enqueueAllCurrentPackets, i=" + i, e);
}
}
}
@ -988,6 +1009,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
void flushAllInternals() throws IOException {
Map<Future<Void>, Integer> flushAllFuturesMap = new HashMap<>();
Future<Void> future = null;
int current = getCurrentIndex();
for (int i = 0; i < numAllBlocks; i++) {
@ -995,13 +1018,37 @@ public class DFSStripedOutputStream extends DFSOutputStream {
if (s.isHealthy()) {
try {
// flush all data to Datanode
flushInternal();
} catch(Exception e) {
handleStreamerFailure("flushInternal " + s, e);
final long toWaitFor = flushInternalWithoutWaitingAck();
future = flushAllExecutorCompletionService.submit(
new Callable<Void>() {
@Override
public Void call() throws Exception {
s.waitForAckedSeqno(toWaitFor);
return null;
}
});
flushAllFuturesMap.put(future, i);
} catch (Exception e) {
handleCurrentStreamerFailure("flushInternal " + s, e);
}
}
}
setCurrentStreamer(current);
for (int i = 0; i < flushAllFuturesMap.size(); i++) {
try {
future = flushAllExecutorCompletionService.take();
future.get();
} catch (InterruptedException ie) {
throw DFSUtilClient.toInterruptedIOException(
"Interrupted during waiting all streamer flush, ", ie);
} catch (ExecutionException ee) {
LOG.warn(
"Caught ExecutionException while waiting all streamer flush, ", ee);
StripedDataStreamer s = streamers.get(flushAllFuturesMap.get(future));
handleStreamerFailure("flushInternal " + s,
(Exception) ee.getCause(), s);
}
}
}
static void sleep(long ms, String op) throws InterruptedIOException {

View File

@ -217,6 +217,9 @@ Trunk (Unreleased)
HDFS-9582. TestLeaseRecoveryStriped file missing Apache License header
and not well formatted. (umamahesh)
HDFS-9494. Parallel optimization of DFSStripedOutputStream#flushAllInternals.
(Gao Rui via jing9)
OPTIMIZATIONS
BUG FIXES