HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker. Contributed by Rakesh R.

This commit is contained in:
Walter Su 2015-05-19 14:59:23 +08:00 committed by Zhe Zhang
parent 8c95673db4
commit 914580934c
3 changed files with 35 additions and 30 deletions

View File

@ -218,6 +218,8 @@
HDFS-8367. BlockInfoStriped uses EC schema. (Kai Sasaki via Kai Zheng) HDFS-8367. BlockInfoStriped uses EC schema. (Kai Sasaki via Kai Zheng)
HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549)
HDFS-8417. Erasure Coding: Pread failed to read data starting from not-first stripe. HDFS-8417. Erasure Coding: Pread failed to read data starting from not-first stripe.
(Walter Su via jing9) (Walter Su via jing9)
@ -228,3 +230,6 @@
HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue
configurable in DFSStripedOutputStream. (Li Bo) configurable in DFSStripedOutputStream. (Li Bo)
HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker.
(Rakesh R via waltersu4549)

View File

@ -728,6 +728,7 @@ class BPOfferService {
LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY"); LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY");
Collection<BlockECRecoveryInfo> ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks(); Collection<BlockECRecoveryInfo> ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks();
dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks); dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
break;
default: default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
} }

View File

@ -62,7 +62,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
@ -88,12 +87,12 @@ import com.google.common.base.Preconditions;
* commands. * commands.
*/ */
public final class ErasureCodingWorker { public final class ErasureCodingWorker {
private final Log LOG = DataNode.LOG; private static final Log LOG = DataNode.LOG;
private final DataNode datanode; private final DataNode datanode;
private Configuration conf; private final Configuration conf;
private ThreadPoolExecutor STRIPED_READ_TRHEAD_POOL; private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int STRIPED_READ_THRESHOLD_MILLIS; private final int STRIPED_READ_THRESHOLD_MILLIS;
private final int STRIPED_READ_BUFFER_SIZE; private final int STRIPED_READ_BUFFER_SIZE;
@ -121,7 +120,10 @@ public final class ErasureCodingWorker {
} }
private void initializeStripedReadThreadPool(int num) { private void initializeStripedReadThreadPool(int num) {
STRIPED_READ_TRHEAD_POOL = new ThreadPoolExecutor(1, num, 60, if (LOG.isDebugEnabled()) {
LOG.debug("Using striped reads; pool threads=" + num);
}
STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() { new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@ -141,7 +143,7 @@ public final class ErasureCodingWorker {
super.rejectedExecution(runnable, e); super.rejectedExecution(runnable, e);
} }
}); });
STRIPED_READ_TRHEAD_POOL.allowCoreThreadTimeOut(true); STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
} }
/** /**
@ -231,23 +233,23 @@ public final class ErasureCodingWorker {
// sources // sources
private final short[] liveIndices; private final short[] liveIndices;
private DatanodeInfo[] sources; private final DatanodeInfo[] sources;
private List<StripedReader> stripedReaders; private final List<StripedReader> stripedReaders;
// targets // targets
private DatanodeInfo[] targets; private final DatanodeInfo[] targets;
private StorageType[] targetStorageTypes; private final StorageType[] targetStorageTypes;
private short[] targetIndices; private final short[] targetIndices;
private ByteBuffer[] targetBuffers; private final ByteBuffer[] targetBuffers;
private Socket[] targetSockets; private final Socket[] targetSockets;
private DataOutputStream[] targetOutputStreams; private final DataOutputStream[] targetOutputStreams;
private DataInputStream[] targetInputStreams; private final DataInputStream[] targetInputStreams;
private long[] blockOffset4Targets; private final long[] blockOffset4Targets;
private long[] seqNo4Targets; private final long[] seqNo4Targets;
private final int WRITE_PACKET_SIZE = 64 * 1024; private final int WRITE_PACKET_SIZE = 64 * 1024;
private DataChecksum checksum; private DataChecksum checksum;
@ -257,11 +259,11 @@ public final class ErasureCodingWorker {
private int bytesPerChecksum; private int bytesPerChecksum;
private int checksumSize; private int checksumSize;
private CachingStrategy cachingStrategy; private final CachingStrategy cachingStrategy;
private Map<Future<Void>, Integer> futures = new HashMap<>(); private final Map<Future<Void>, Integer> futures = new HashMap<>();
private CompletionService<Void> readService = private final CompletionService<Void> readService =
new ExecutorCompletionService<>(STRIPED_READ_TRHEAD_POOL); new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL);
ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) { ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
ECSchema schema = recoveryInfo.getECSchema(); ECSchema schema = recoveryInfo.getECSchema();
@ -277,7 +279,8 @@ public final class ErasureCodingWorker {
Preconditions.checkArgument(liveIndices.length >= dataBlkNum, Preconditions.checkArgument(liveIndices.length >= dataBlkNum,
"No enough live striped blocks."); "No enough live striped blocks.");
Preconditions.checkArgument(liveIndices.length == sources.length); Preconditions.checkArgument(liveIndices.length == sources.length,
"liveBlockIndices and source dns should match");
targets = recoveryInfo.getTargetDnInfos(); targets = recoveryInfo.getTargetDnInfos();
targetStorageTypes = recoveryInfo.getTargetStorageTypes(); targetStorageTypes = recoveryInfo.getTargetStorageTypes();
@ -336,7 +339,6 @@ public final class ErasureCodingWorker {
if (nsuccess < dataBlkNum) { if (nsuccess < dataBlkNum) {
String error = "Can't find minimum sources required by " String error = "Can't find minimum sources required by "
+ "recovery, block id: " + blockGroup.getBlockId(); + "recovery, block id: " + blockGroup.getBlockId();
LOG.warn(error);
throw new IOException(error); throw new IOException(error);
} }
@ -358,7 +360,6 @@ public final class ErasureCodingWorker {
boolean[] targetsStatus = new boolean[targets.length]; boolean[] targetsStatus = new boolean[targets.length];
if (initTargetStreams(targetsStatus) == 0) { if (initTargetStreams(targetsStatus) == 0) {
String error = "All targets are failed."; String error = "All targets are failed.";
LOG.warn(error);
throw new IOException(error); throw new IOException(error);
} }
@ -372,7 +373,6 @@ public final class ErasureCodingWorker {
if (nsuccess < dataBlkNum) { if (nsuccess < dataBlkNum) {
String error = "Can't read data from minimum number of sources " String error = "Can't read data from minimum number of sources "
+ "required by recovery, block id: " + blockGroup.getBlockId(); + "required by recovery, block id: " + blockGroup.getBlockId();
LOG.warn(error);
throw new IOException(error); throw new IOException(error);
} }
@ -385,7 +385,6 @@ public final class ErasureCodingWorker {
// step3: transfer data // step3: transfer data
if (transferData2Targets(targetsStatus) == 0) { if (transferData2Targets(targetsStatus) == 0) {
String error = "Transfer failed for all targets."; String error = "Transfer failed for all targets.";
LOG.warn(error);
throw new IOException(error); throw new IOException(error);
} }
@ -906,11 +905,11 @@ public final class ErasureCodingWorker {
} }
private class StripedReader { private class StripedReader {
short index; private final short index;
BlockReader blockReader; private BlockReader blockReader;
ByteBuffer buffer; private ByteBuffer buffer;
public StripedReader(short index) { private StripedReader(short index) {
this.index = index; this.index = index;
} }
} }