diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 7efaa5a21cc..0d2d4485e16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -175,3 +175,6 @@ HDFS-7672. Handle write failure for stripping blocks and refactor the existing code in DFSStripedOutputStream and StripedDataStreamer. (szetszwo) + + HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. + (Yi Liu via Zhe Zhang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index aa3e8ba9909..0a5511e16f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.util.DataChecksum; /** * A BlockReader is responsible for reading a single block @@ -99,4 +100,9 @@ public interface BlockReader extends ByteBufferReadable { * supported. */ ClientMmap getClientMmap(EnumSet opts); + + /** + * @return The DataChecksum used by the read block + */ + DataChecksum getDataChecksum(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index d913f3a2835..0b2420d5451 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -738,4 +738,9 @@ class BlockReaderLocal implements BlockReader { void forceUnanchorable() { replica.getSlot().makeUnanchorable(); } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index c16ffdf2e0e..04cf733cc01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -732,4 +732,9 @@ class BlockReaderLocalLegacy implements BlockReader { public ClientMmap getClientMmap(EnumSet opts) { return null; } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 6bc005bd14b..d5d30955b5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -369,6 +369,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600; public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads"; public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1; + public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads"; + public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20; + public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size"; + public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 256 * 1024; + public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis"; + public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java index 9cd1ec1fcc3..a26e35e7c58 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -37,7 +37,7 @@ import org.apache.htrace.Span; ****************************************************************/ @InterfaceAudience.Private -class DFSPacket { +public class DFSPacket { public static final long HEART_BEAT_SEQNO = -1L; private static long[] EMPTY = new long[0]; private final long seqno; // sequence number of buffer in block @@ -80,7 +80,7 @@ class DFSPacket { * @param checksumSize the size of checksum * @param lastPacketInBlock if this is the last packet */ - DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, + public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, int checksumSize, boolean lastPacketInBlock) { this.lastPacketInBlock = lastPacketInBlock; this.numChunks = 0; @@ -114,7 +114,7 @@ class DFSPacket { dataPos += len; } - synchronized void writeData(ByteBuffer inBuffer, int len) + public synchronized void writeData(ByteBuffer inBuffer, int len) throws ClosedChannelException { checkBuffer(); len = len > inBuffer.remaining() ? inBuffer.remaining() : len; @@ -135,7 +135,7 @@ class DFSPacket { * @param len the length of checksums to write * @throws ClosedChannelException */ - synchronized void writeChecksum(byte[] inarray, int off, int len) + public synchronized void writeChecksum(byte[] inarray, int off, int len) throws ClosedChannelException { checkBuffer(); if (len == 0) { @@ -154,7 +154,7 @@ class DFSPacket { * @param stm * @throws IOException */ - synchronized void writeTo(DataOutputStream stm) throws IOException { + public synchronized void writeTo(DataOutputStream stm) throws IOException { checkBuffer(); final int dataLen = dataPos - dataStart; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index d70f41904bc..70cce7e2e5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -505,4 +505,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { public ClientMmap getClientMmap(EnumSet opts) { return null; } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index c368d6515f1..cce44b7f777 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -474,4 +474,9 @@ public class RemoteBlockReader2 implements BlockReader { public ClientMmap getClientMmap(EnumSet opts) { return null; } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 4b7fbc3c1f8..d25642f047a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -235,6 +235,33 @@ public class DNConf { return maxLockedMemory; } + /** + * Returns true if connect to datanode via hostname + * + * @return boolean true if connect to datanode via hostname + */ + public boolean getConnectToDnViaHostname() { + return connectToDnViaHostname; + } + + /** + * Returns socket timeout + * + * @return int socket timeout + */ + public int getSocketTimeout() { + return socketTimeout; + } + + /** + * Returns socket write timeout + * + * @return int socket write timeout + */ + public int getSocketWriteTimeout() { + return socketWriteTimeout; + } + /** * Returns the SaslPropertiesResolver configured for use with * DataTransferProtocol, or null if not configured. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 221ba386ac1..5eca2c7e986 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1161,7 +1161,8 @@ public class DataNode extends ReconfigurableBase saslClient = new SaslDataTransferClient(dnConf.conf, dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); - ecWorker = new ErasureCodingWorker(conf); // Initialize ErasureCoding worker + // Initialize ErasureCoding worker + ecWorker = new ErasureCodingWorker(conf, this); } /** @@ -1226,6 +1227,10 @@ public class DataNode extends ReconfigurableBase return UUID.randomUUID().toString(); } + public SaslDataTransferClient getSaslClient() { + return saslClient; + } + /** * Verify that the DatanodeUuid has been initialized. If this is a new * datanode then we generate a new Datanode Uuid and persist it to disk. @@ -1488,7 +1493,7 @@ public class DataNode extends ReconfigurableBase /** * Creates either NIO or regular depending on socketWriteTimeout. */ - protected Socket newSocket() throws IOException { + public Socket newSocket() throws IOException { return (dnConf.socketWriteTimeout > 0) ? SocketChannel.open().socket() : new Socket(); } @@ -2143,11 +2148,8 @@ public class DataNode extends ReconfigurableBase // // Header info // - Token accessToken = BlockTokenSecretManager.DUMMY_TOKEN; - if (isBlockTokenEnabled) { - accessToken = blockPoolTokenSecretManager.generateToken(b, - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); - } + Token accessToken = getBlockAccessToken(b, + EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); long writeTimeout = dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); @@ -2214,6 +2216,19 @@ public class DataNode extends ReconfigurableBase } } + /*** + * Use BlockTokenSecretManager to generate block token for current user. + */ + public Token getBlockAccessToken(ExtendedBlock b, + EnumSet mode) throws IOException { + Token accessToken = + BlockTokenSecretManager.DUMMY_TOKEN; + if (isBlockTokenEnabled) { + accessToken = blockPoolTokenSecretManager.generateToken(b, mode); + } + return accessToken; + } + /** * Returns a new DataEncryptionKeyFactory that generates a key from the * BlockPoolTokenSecretManager, using the block pool ID of the given block. @@ -2221,7 +2236,7 @@ public class DataNode extends ReconfigurableBase * @param block for which the factory needs to create a key * @return DataEncryptionKeyFactory for block's block pool ID */ - DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( + public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( final ExtendedBlock block) { return new DataEncryptionKeyFactory() { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 6430308e7d5..c4e568f2841 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -17,15 +17,68 @@ */ package org.apache.hadoop.hdfs.server.datanode.erasurecode; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSPacket; +import org.apache.hadoop.hdfs.RemoteBlockReader2; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; -import org.apache.hadoop.io.erasurecode.coder.AbstractErasureCoder; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripedReadResult; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.base.Preconditions; /** * ErasureCodingWorker handles the erasure coding recovery work commands. These @@ -34,41 +87,60 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder; * commands. */ public final class ErasureCodingWorker { - + private final Log LOG = DataNode.LOG; + + private final DataNode datanode; private Configuration conf; - RawErasureCoder rawEncoder = null; - RawErasureCoder rawDecoder = null; - public ErasureCodingWorker(Configuration conf) { + private ThreadPoolExecutor STRIPED_READ_TRHEAD_POOL; + private final int STRIPED_READ_THRESHOLD_MILLIS; + private final int STRIPED_READ_BUFFER_SIZE; + + public ErasureCodingWorker(Configuration conf, DataNode datanode) { + this.datanode = datanode; this.conf = conf; - initialize(); + + STRIPED_READ_THRESHOLD_MILLIS = conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT); + initializeStripedReadThreadPool(conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT)); + STRIPED_READ_BUFFER_SIZE = conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT); } - /** - * Initializes the required resources for handling the erasure coding recovery - * work. - */ - public void initialize() { - // Right now directly used RS coder. Once other coders integration ready, we - // can load preferred codec here. - initializeErasureEncoder(); - initializeErasureDecoder(); + private RawErasureEncoder newEncoder() { + return new RSRawEncoder(); + } + + private RawErasureDecoder newDecoder() { + return new RSRawDecoder(); } - private void initializeErasureDecoder() { - rawDecoder = AbstractErasureCoder.createRawCoder(conf, - CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, false); - if (rawDecoder == null) { - rawDecoder = new RSRawDecoder(); - } - } + private void initializeStripedReadThreadPool(int num) { + STRIPED_READ_TRHEAD_POOL = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); - private void initializeErasureEncoder() { - rawEncoder = AbstractErasureCoder.createRawCoder(conf, - CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, true); - if (rawEncoder == null) { - rawEncoder = new RSRawEncoder(); - } + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedRead-" + threadIndex.getAndIncrement()); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { + LOG.info("Execution for striped reading rejected, " + + "Executing in current thread"); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + STRIPED_READ_TRHEAD_POOL.allowCoreThreadTimeOut(true); } /** @@ -78,6 +150,765 @@ public final class ErasureCodingWorker { * BlockECRecoveryInfo */ public void processErasureCodingTasks(Collection ecTasks) { - // HDFS-7348 : Implement the actual recovery process + for (BlockECRecoveryInfo recoveryInfo : ecTasks) { + try { + new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start(); + } catch (Throwable e) { + LOG.warn("Failed to recover striped block " + + recoveryInfo.getExtendedBlock().getLocalBlock(), e); + } + } + } + + /** + * ReconstructAndTransferBlock recover one or more missed striped block in the + * striped block group, the minimum number of live striped blocks should be + * no less than data block number. + * + * | <- Striped Block Group -> | + * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group + * | | | | + * v v v v + * +------+ +------+ +------+ +------+ + * |cell_0| |cell_1| |cell_2| |cell_3| ... + * +------+ +------+ +------+ +------+ + * |cell_4| |cell_5| |cell_6| |cell_7| ... + * +------+ +------+ +------+ +------+ + * |cell_8| |cell_9| |cell10| |cell11| ... + * +------+ +------+ +------+ +------+ + * ... ... ... ... + * + * + * We use following steps to recover striped block group, in each round, we + * recover bufferSize data until finish, the + * bufferSize is configurable and may be less or larger than + * cell size: + * step1: read bufferSize data from minimum number of sources + * required by recovery. + * step2: decode data for targets. + * step3: transfer data to targets. + * + * In step1, try to read bufferSize data from minimum number + * of sources , if there is corrupt or stale sources, read from new source + * will be scheduled. The best sources are remembered for next round and + * may be updated in each round. + * + * In step2, typically if source blocks we read are all data blocks, we + * need to call encode, and if there is one parity block, we need to call + * decode. Notice we only read once and recover all missed striped block + * if they are more than one. + * + * In step3, send the recovered data to targets by constructing packet + * and send them directly. Same as continuous block replication, we + * don't check the packet ack. Since the datanode doing the recovery work + * are one of the source datanodes, so the recovered data are sent + * remotely. + * + * There are some points we can do further improvements in next phase: + * 1. we can read the block file directly on the local datanode, + * currently we use remote block reader. (Notice short-circuit is not + * a good choice, see inline comments). + * 2. We need to check the packet ack for EC recovery? Since EC recovery + * is more expensive than continuous block replication, it needs to + * read from several other datanodes, should we make sure the + * recovered result received by targets? + */ + private class ReconstructAndTransferBlock implements Runnable { + private final int dataBlkNum; + private final int parityBlkNum; + private final int cellSize; + + private RawErasureEncoder encoder; + private RawErasureDecoder decoder; + + // Striped read buffer size + private int bufferSize; + + private final ExtendedBlock blockGroup; + // position in striped block + private long positionInBlock; + + // sources + private final short[] liveIndices; + private DatanodeInfo[] sources; + + private List stripedReaders; + + // targets + private DatanodeInfo[] targets; + private StorageType[] targetStorageTypes; + + private short[] targetIndices; + private ByteBuffer[] targetBuffers; + + private Socket[] targetSockets; + private DataOutputStream[] targetOutputStreams; + private DataInputStream[] targetInputStreams; + + private long[] blockOffset4Targets; + private long[] seqNo4Targets; + + private final int WRITE_PACKET_SIZE = 64 * 1024; + private DataChecksum checksum; + private int maxChunksPerPacket; + private byte[] packetBuf; + private byte[] checksumBuf; + private int bytesPerChecksum; + private int checksumSize; + + private CachingStrategy cachingStrategy; + + private Map, Integer> futures = new HashMap<>(); + private CompletionService readService = + new ExecutorCompletionService<>(STRIPED_READ_TRHEAD_POOL); + + ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) { + ECSchema schema = recoveryInfo.getECSchema(); + dataBlkNum = schema.getNumDataUnits(); + parityBlkNum = schema.getNumParityUnits(); + cellSize = schema.getChunkSize(); + + blockGroup = recoveryInfo.getExtendedBlock(); + + liveIndices = recoveryInfo.getLiveBlockIndices(); + sources = recoveryInfo.getSourceDnInfos(); + stripedReaders = new ArrayList<>(sources.length); + + Preconditions.checkArgument(liveIndices.length >= dataBlkNum, + "No enough live striped blocks."); + Preconditions.checkArgument(liveIndices.length == sources.length); + + targets = recoveryInfo.getTargetDnInfos(); + targetStorageTypes = recoveryInfo.getTargetStorageTypes(); + targetIndices = new short[targets.length]; + targetBuffers = new ByteBuffer[targets.length]; + + targetSockets = new Socket[targets.length]; + targetOutputStreams = new DataOutputStream[targets.length]; + targetInputStreams = new DataInputStream[targets.length]; + + blockOffset4Targets = new long[targets.length]; + seqNo4Targets = new long[targets.length]; + + for (int i = 0; i < targets.length; i++) { + blockOffset4Targets[i] = 0; + seqNo4Targets[i] = 0; + } + + getTargetIndices(); + cachingStrategy = CachingStrategy.newDefaultStrategy(); + } + + private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) { + return StripedBlockUtil.constructStripedBlock(blockGroup, cellSize, + dataBlkNum, i); + } + + private long getBlockLen(ExtendedBlock blockGroup, int i) { + return StripedBlockUtil.getStripedBlockLength(blockGroup.getNumBytes(), + cellSize, dataBlkNum, i); + } + + @Override + public void run() { + try { + // Store the indices of successfully read source + // This will be updated after doing real read. + int[] success = new int[dataBlkNum]; + + int nsuccess = 0; + for (int i = 0; i < sources.length && nsuccess < dataBlkNum; i++) { + StripedReader reader = new StripedReader(liveIndices[i]); + stripedReaders.add(reader); + + BlockReader blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), 0, sources[i]); + if (blockReader != null) { + initChecksumAndBufferSizeIfNeeded(blockReader); + reader.blockReader = blockReader; + reader.buffer = ByteBuffer.allocate(bufferSize); + success[nsuccess++] = i; + } + } + + if (nsuccess < dataBlkNum) { + String error = "Can't find minimum sources required by " + + "recovery, block id: " + blockGroup.getBlockId(); + LOG.warn(error); + throw new IOException(error); + } + + for (int i = 0; i < targets.length; i++) { + targetBuffers[i] = ByteBuffer.allocate(bufferSize); + } + + checksumSize = checksum.getChecksumSize(); + int chunkSize = bytesPerChecksum + checksumSize; + maxChunksPerPacket = Math.max( + (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1); + int maxPacketSize = chunkSize * maxChunksPerPacket + + PacketHeader.PKT_MAX_HEADER_LEN; + + packetBuf = new byte[maxPacketSize]; + checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)]; + + // Store whether the target is success + boolean[] targetsStatus = new boolean[targets.length]; + if (initTargetStreams(targetsStatus) == 0) { + String error = "All targets are failed."; + LOG.warn(error); + throw new IOException(error); + } + + long firstStripedBlockLength = getBlockLen(blockGroup, 0); + while (positionInBlock < firstStripedBlockLength) { + int toRead = Math.min( + bufferSize, (int)(firstStripedBlockLength - positionInBlock)); + // step1: read minimum striped buffer size data required by recovery. + nsuccess = readMinimumStripedData4Recovery(success); + + if (nsuccess < dataBlkNum) { + String error = "Can't read data from minimum number of sources " + + "required by recovery, block id: " + blockGroup.getBlockId(); + LOG.warn(error); + throw new IOException(error); + } + + // step2: encode/decode to recover targets + long remaining = firstStripedBlockLength - positionInBlock; + int toRecoverLen = remaining < bufferSize ? + (int)remaining : bufferSize; + recoverTargets(success, targetsStatus, toRecoverLen); + + // step3: transfer data + if (transferData2Targets(targetsStatus) == 0) { + String error = "Transfer failed for all targets."; + LOG.warn(error); + throw new IOException(error); + } + + clearBuffers(); + positionInBlock += toRead; + } + + endTargetBlocks(targetsStatus); + + // Currently we don't check the acks for packets, this is similar as + // block replication. + } catch (Throwable e) { + LOG.warn("Failed to recover striped block: " + blockGroup); + } finally { + // close block readers + for (StripedReader stripedReader : stripedReaders) { + closeBlockReader(stripedReader.blockReader); + } + for (int i = 0; i < targets.length; i++) { + IOUtils.closeStream(targetOutputStreams[i]); + IOUtils.closeStream(targetInputStreams[i]); + IOUtils.closeStream(targetSockets[i]); + } + } + } + + // init checksum from block reader + private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) { + if (checksum == null) { + checksum = blockReader.getDataChecksum(); + bytesPerChecksum = checksum.getBytesPerChecksum(); + // The bufferSize is flat to divide bytesPerChecksum + int readBufferSize = STRIPED_READ_BUFFER_SIZE; + bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum : + readBufferSize - readBufferSize % bytesPerChecksum; + } else { + assert blockReader.getDataChecksum().equals(checksum); + } + } + + // assume liveIndices is not ordered. + private void getTargetIndices() { + BitSet bitset = new BitSet(dataBlkNum + parityBlkNum); + for (int i = 0; i < sources.length; i++) { + bitset.set(liveIndices[i]); + } + int m = 0; + for (int i = 0; i < dataBlkNum + parityBlkNum && m < targets.length; i++) { + if (!bitset.get(i)) { + targetIndices[m++] = (short)i; + } + } + } + + /** + * Read minimum striped buffer size data required by recovery. + * success list will be updated after read. + * + * Initially we only read from dataBlkNum sources, + * if timeout or failure for some source, we will try to schedule + * read from a new source. + */ + private int readMinimumStripedData4Recovery(int[] success) { + + BitSet used = new BitSet(sources.length); + for (int i = 0; i < dataBlkNum; i++) { + StripedReader reader = stripedReaders.get(success[i]); + Callable readCallable = readFromBlock( + reader.blockReader, reader.buffer); + Future f = readService.submit(readCallable); + futures.put(f, success[i]); + used.set(success[i]); + } + + int nsuccess = 0; + while (!futures.isEmpty()) { + try { + StripedReadResult result = + StripedBlockUtil.getNextCompletedStripedRead( + readService, futures, STRIPED_READ_THRESHOLD_MILLIS); + if (result.state == StripedReadResult.SUCCESSFUL) { + success[nsuccess++] = result.index; + if (nsuccess >= dataBlkNum) { + // cancel remaining reads if we read successfully from minimum + // number of sources required for recovery. + cancelReads(futures.keySet()); + futures.clear(); + break; + } + } else if (result.state == StripedReadResult.FAILED) { + // If read failed for some source, we should not use it anymore + // and schedule read from a new source. + StripedReader failedReader = stripedReaders.get(result.index); + closeBlockReader(failedReader.blockReader); + failedReader.blockReader = null; + scheduleNewRead(used); + } else if (result.state == StripedReadResult.TIMEOUT) { + // If timeout, we also schedule a new read. + scheduleNewRead(used); + } + } catch (InterruptedException e) { + LOG.info("Read data interrupted.", e); + break; + } + } + + return nsuccess; + } + + /** + * Return true if need to do encoding to recovery missed striped block. + */ + private boolean shouldEncode(int[] success) { + for (int i = 0; i < success.length; i++) { + if (stripedReaders.get(success[i]).index >= dataBlkNum) { + return false; + } + } + return true; + } + + private void paddingBufferToLen(ByteBuffer buffer, int len) { + int toPadding = len - buffer.position(); + for (int i = 0; i < toPadding; i++) { + buffer.put((byte) 0); + } + } + + // Initialize encoder + private void initEncoderIfNecessary() { + if (encoder == null) { + encoder = newEncoder(); + encoder.initialize(dataBlkNum, parityBlkNum, bufferSize); + } + } + + // Initialize decoder + private void initDecoderIfNecessary() { + if (decoder == null) { + decoder = newDecoder(); + decoder.initialize(dataBlkNum, parityBlkNum, bufferSize); + } + } + + private void recoverTargets(int[] success, boolean[] targetsStatus, + int toRecoverLen) { + if (shouldEncode(success)) { + initEncoderIfNecessary(); + ByteBuffer[] dataBuffers = new ByteBuffer[dataBlkNum]; + ByteBuffer[] parityBuffers = new ByteBuffer[parityBlkNum]; + for (int i = 0; i < dataBlkNum; i++) { + StripedReader reader = stripedReaders.get(i); + ByteBuffer buffer = reader.buffer; + paddingBufferToLen(buffer, toRecoverLen); + dataBuffers[i] = (ByteBuffer)buffer.flip(); + } + for (int i = dataBlkNum; i < stripedReaders.size(); i++) { + StripedReader reader = stripedReaders.get(i); + parityBuffers[reader.index - dataBlkNum] = cleanBuffer(reader.buffer); + } + for (int i = 0; i < targets.length; i++) { + parityBuffers[targetIndices[i] - dataBlkNum] = targetBuffers[i]; + } + for (int i = 0; i < parityBlkNum; i++) { + if (parityBuffers[i] == null) { + parityBuffers[i] = ByteBuffer.allocate(toRecoverLen); + } else { + parityBuffers[i].limit(toRecoverLen); + } + } + encoder.encode(dataBuffers, parityBuffers); + } else { + /////////// TODO: wait for HADOOP-11847 ///////////// + ////////// The current decode method always try to decode parityBlkNum number of data blocks. //////////// + initDecoderIfNecessary(); + ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; + for (int i = 0; i < success.length; i++) { + StripedReader reader = stripedReaders.get(success[i]); + ByteBuffer buffer = reader.buffer; + paddingBufferToLen(buffer, toRecoverLen); + int index = reader.index < dataBlkNum ? + reader.index + parityBlkNum : reader.index - dataBlkNum; + inputs[index] = (ByteBuffer)buffer.flip(); + } + int[] indices4Decode = new int[parityBlkNum]; + int m = 0; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + if (inputs[i] == null) { + inputs[i] = ByteBuffer.allocate(toRecoverLen); + indices4Decode[m++] = i; + } + } + ByteBuffer[] outputs = new ByteBuffer[parityBlkNum]; + m = 0; + // targetIndices is subset of indices4Decode + for (int i = 0; i < parityBlkNum; i++) { + if (m < targetIndices.length && + (indices4Decode[i] - parityBlkNum) == targetIndices[m]) { + outputs[i] = targetBuffers[m++]; + outputs[i].limit(toRecoverLen); + } else { + outputs[i] = ByteBuffer.allocate(toRecoverLen); + } + } + + decoder.decode(inputs, indices4Decode, outputs); + + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + long blockLen = getBlockLen(blockGroup, targetIndices[i]); + long remaining = blockLen - positionInBlock; + if (remaining < 0) { + targetBuffers[i].limit(0); + } else if (remaining < toRecoverLen) { + targetBuffers[i].limit((int)remaining); + } + } + } + } + } + + /** + * Schedule read from a new source, we first try un-initial source, + * then try un-used source in this round and bypass failed source. + */ + private void scheduleNewRead(BitSet used) { + StripedReader reader = null; + int m = stripedReaders.size(); + while (m < sources.length && reader == null) { + reader = new StripedReader(liveIndices[m]); + BlockReader blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[m]), positionInBlock, sources[m]); + stripedReaders.add(reader); + if (blockReader != null) { + assert blockReader.getDataChecksum().equals(checksum); + reader.blockReader = blockReader; + reader.buffer = ByteBuffer.allocate(bufferSize); + } else { + m++; + reader = null; + } + } + + for (int i = 0; reader == null && i < stripedReaders.size(); i++) { + StripedReader r = stripedReaders.get(i); + if (r.blockReader != null && !used.get(i)) { + closeBlockReader(r.blockReader); + r.blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), positionInBlock, + sources[i]); + if (r.blockReader != null) { + m = i; + reader = r; + } + } + } + + if (reader != null) { + Callable readCallable = readFromBlock( + reader.blockReader, reader.buffer); + Future f = readService.submit(readCallable); + futures.put(f, m); + used.set(m); + } + } + + // cancel all reads. + private void cancelReads(Collection> futures) { + for (Future future : futures) { + future.cancel(true); + } + } + + private Callable readFromBlock(final BlockReader reader, + final ByteBuffer buf) { + return new Callable() { + + @Override + public Void call() throws Exception { + try { + actualReadFromBlock(reader, buf); + return null; + } catch (IOException e) { + LOG.info(e.getMessage()); + throw e; + } + } + + }; + } + + /** + * Read bytes from block + */ + private void actualReadFromBlock(BlockReader reader, ByteBuffer buf) + throws IOException { + int len = buf.remaining(); + int n = 0; + while (n < len) { + int nread = reader.read(buf); + if (nread <= 0) { + break; + } + n += nread; + } + } + + // close block reader + private void closeBlockReader(BlockReader blockReader) { + try { + if (blockReader != null) { + blockReader.close(); + } + } catch (IOException e) { + // ignore + } + } + + private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { + return NetUtils.createSocketAddr(dnInfo.getXferAddr( + datanode.getDnConf().getConnectToDnViaHostname())); + } + + private BlockReader newBlockReader(final ExtendedBlock block, + long startOffset, DatanodeInfo dnInfo) { + try { + InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo); + Token blockToken = datanode.getBlockAccessToken( + block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ)); + /* + * This can be further improved if the replica is local, then we can + * read directly from DN and need to check the replica is FINALIZED + * state, notice we should not use short-circuit local read which + * requires config for domain-socket in UNIX or legacy config in Windows. + */ + return RemoteBlockReader2.newBlockReader( + "dummy", block, blockToken, startOffset, block.getNumBytes(), true, + "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo, + null, cachingStrategy); + } catch (IOException e) { + return null; + } + } + + private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr, + Token blockToken, DatanodeID datanodeId) + throws IOException { + Peer peer = null; + boolean success = false; + Socket sock = null; + final int socketTimeout = datanode.getDnConf().getSocketTimeout(); + try { + sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + NetUtils.connect(sock, addr, socketTimeout); + peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(), + sock, datanode.getDataEncryptionKeyFactoryForBlock(b), + blockToken, datanodeId); + peer.setReadTimeout(socketTimeout); + success = true; + return peer; + } finally { + if (!success) { + IOUtils.cleanup(LOG, peer); + IOUtils.closeSocket(sock); + } + } + } + + /** + * Send data to targets + */ + private int transferData2Targets(boolean[] targetsStatus) { + int nsuccess = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + boolean success = false; + try { + ByteBuffer buffer = targetBuffers[i]; + + if (buffer.remaining() == 0) { + continue; + } + + checksum.calculateChunkedSums( + buffer.array(), 0, buffer.remaining(), checksumBuf, 0); + + int ckOff = 0; + while (buffer.remaining() > 0) { + DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket, + blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false); + int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum; + int toWrite = buffer.remaining() > maxBytesToPacket ? + maxBytesToPacket : buffer.remaining(); + int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize; + packet.writeChecksum(checksumBuf, ckOff, ckLen); + ckOff += ckLen; + packet.writeData(buffer, toWrite); + + // Send packet + packet.writeTo(targetOutputStreams[i]); + + blockOffset4Targets[i] += toWrite; + nsuccess++; + success = true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + targetsStatus[i] = success; + } + } + return nsuccess; + } + + /** + * clear all buffers + */ + private void clearBuffers() { + for (StripedReader stripedReader : stripedReaders) { + if (stripedReader.buffer != null) { + stripedReader.buffer.clear(); + } + } + + for (int i = 0; i < targetBuffers.length; i++) { + if (targetBuffers[i] != null) { + cleanBuffer(targetBuffers[i]); + } + } + } + + private ByteBuffer cleanBuffer(ByteBuffer buffer) { + Arrays.fill(buffer.array(), (byte) 0); + return (ByteBuffer)buffer.clear(); + } + + // send an empty packet to mark the end of the block + private void endTargetBlocks(boolean[] targetsStatus) { + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + try { + DFSPacket packet = new DFSPacket(packetBuf, 0, + blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true); + packet.writeTo(targetOutputStreams[i]); + targetOutputStreams[i].flush(); + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + } + + /** + * Initialize output/input streams for transferring data to target + * and send create block request. + */ + private int initTargetStreams(boolean[] targetsStatus) { + int nsuccess = 0; + for (int i = 0; i < targets.length; i++) { + Socket socket = null; + DataOutputStream out = null; + DataInputStream in = null; + boolean success = false; + try { + InetSocketAddress targetAddr = + getSocketAddress4Transfer(targets[i]); + socket = datanode.newSocket(); + NetUtils.connect(socket, targetAddr, + datanode.getDnConf().getSocketTimeout()); + socket.setSoTimeout(datanode.getDnConf().getSocketTimeout()); + + ExtendedBlock block = getBlock(blockGroup, targetIndices[i]); + Token blockToken = + datanode.getBlockAccessToken(block, + EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); + + long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); + OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(socket); + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.getSaslClient().socketSend( + socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]); + + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsServerConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); + + DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId()); + new Sender(out).writeBlock(block, targetStorageTypes[i], + blockToken, "", new DatanodeInfo[]{targets[i]}, + new StorageType[]{targetStorageTypes[i]}, source, + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, + checksum, cachingStrategy, false, false, null); + + targetSockets[i] = socket; + targetOutputStreams[i] = out; + targetInputStreams[i] = in; + nsuccess++; + success = true; + } catch (Throwable e) { + LOG.warn(e.getMessage()); + } finally { + if (!success) { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeStream(socket); + } + } + targetsStatus[i] = success; + } + return nsuccess; + } + } + + private class StripedReader { + short index; + BlockReader blockReader; + ByteBuffer buffer; + + public StripedReader(short index) { + this.index = index; + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 24d4bfba844..45bbf6bf6d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.util; import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -77,10 +78,8 @@ public class StripedBlockUtil { public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, int idxInReturnedLocs, int cellSize, int dataBlkNum, int idxInBlockGroup) { - final ExtendedBlock blk = new ExtendedBlock(bg.getBlock()); - blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup); - blk.setNumBytes(getInternalBlockLength(bg.getBlockSize(), - cellSize, dataBlkNum, idxInBlockGroup)); + final ExtendedBlock blk = constructInternalBlock( + bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup); return new LocatedBlock(blk, new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, @@ -90,6 +89,44 @@ public class StripedBlockUtil { null); } + /** + * This method creates an internal {@link ExtendedBlock} at the given index + * of a block group. + */ + public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup, + int cellSize, int dataBlkNum, int idxInBlockGroup) { + ExtendedBlock block = new ExtendedBlock(blockGroup); + block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup); + block.setNumBytes(getInternalBlockLength(blockGroup.getNumBytes(), + cellSize, dataBlkNum, idxInBlockGroup)); + return block; + } + + /** + * This method creates an internal {@link ExtendedBlock} at the given index + * of a block group, for both data and parity block. + */ + public static ExtendedBlock constructStripedBlock(ExtendedBlock blockGroup, + int cellSize, int dataBlkNum, int idxInBlockGroup) { + ExtendedBlock block = new ExtendedBlock(blockGroup); + block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup); + block.setNumBytes(getStripedBlockLength(blockGroup.getNumBytes(), cellSize, + dataBlkNum, idxInBlockGroup)); + return block; + } + + /** + * Returns an internal block length at the given index of a block group, + * for both data and parity block. + */ + public static long getStripedBlockLength(long numBytes, int cellSize, + int dataBlkNum, int idxInBlockGroup) { + // parity block length is the same as the first striped block length. + return StripedBlockUtil.getInternalBlockLength( + numBytes, cellSize, dataBlkNum, + idxInBlockGroup < dataBlkNum ? idxInBlockGroup : 0); + } + /** * Get the size of an internal block at the given index of a block group * @@ -208,8 +245,8 @@ public class StripedBlockUtil { * @throws InterruptedException */ public static StripedReadResult getNextCompletedStripedRead( - CompletionService readService, Map, - Integer> futures, final long threshold) throws InterruptedException { + CompletionService readService, Map, Integer> futures, + final long threshold) throws InterruptedException { Preconditions.checkArgument(!futures.isEmpty()); Preconditions.checkArgument(threshold > 0); Future future = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7f0730b07f2..f80212844bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2312,11 +2312,11 @@ - - dfs.datanode.block-pinning.enabled - false - Whether pin blocks on favored DataNode. - + + dfs.datanode.block-pinning.enabled + false + Whether pin blocks on favored DataNode. + dfs.client.block.write.locateFollowingBlock.initial.delay.ms @@ -2354,4 +2354,25 @@ + + dfs.datanode.stripedread.threshold.millis + 5000 + datanode striped read threshold in millisecond. + + + + + dfs.datanode.stripedread.threads + 20 + datanode striped read thread pool size. + + + + + dfs.datanode.stripedread.buffer.size + 262144 + datanode striped read buffer size. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java new file mode 100644 index 00000000000..b4f05d46b9a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestRecoverStripedFile { + public static final Log LOG = LogFactory.getLog(TestRecoverStripedFile.class); + + private static final int dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS; + private static final int parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS; + private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private static final int blockSize = cellSize * 3; + private static final int groupSize = dataBlkNum + parityBlkNum; + private static final int dnNum = groupSize + parityBlkNum; + + private MiniDFSCluster cluster; + private Configuration conf; + private DistributedFileSystem fs; + // Map: DatanodeID -> datanode index in cluster + private Map dnMap = new HashMap(); + + @Before + public void setup() throws IOException { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();; + cluster.waitActive(); + + fs = cluster.getFileSystem(); + fs.getClient().createErasureCodingZone("/", null); + + List datanodes = cluster.getDataNodes(); + for (int i = 0; i < dnNum; i++) { + dnMap.put(datanodes.get(i).getDatanodeId(), i); + } + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1); + } + + @Test(timeout = 120000) + public void testRecoverThreeParityBlocks() throws Exception { + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3); + } + + @Test(timeout = 120000) + public void testRecoverThreeDataBlocks() throws Exception { + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3); + } + + @Test(timeout = 120000) + public void testRecoverOneDataBlock() throws Exception { + ////TODO: TODO: wait for HADOOP-11847 + //int fileLen = 10 * blockSize + blockSize/10; + //assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1); + } + + @Test(timeout = 120000) + public void testRecoverAnyBlocks() throws Exception { + ////TODO: TODO: wait for HADOOP-11847 + //int fileLen = 3 * blockSize + blockSize/10; + //assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2); + } + + /** + * Test the file blocks recovery. + * 1. Check the replica is recovered in the target datanode, + * and verify the block replica length, generationStamp and content. + * 2. Read the file and verify content. + */ + private void assertFileBlocksRecovery(String fileName, int fileLen, + int recovery, int toRecoverBlockNum) throws Exception { + if (recovery != 0 && recovery != 1 && recovery != 2) { + Assert.fail("Invalid recovery: 0 is to recovery parity blocks," + + "1 is to recovery data blocks, 2 is any."); + } + if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) { + Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum); + } + + Path file = new Path(fileName); + + testCreateStripedFile(file, fileLen); + + LocatedBlocks locatedBlocks = getLocatedBlocks(file); + assertEquals(locatedBlocks.getFileLength(), fileLen); + + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + + DatanodeInfo[] storageInfos = lastBlock.getLocations(); + int[] indices = lastBlock.getBlockIndices(); + + BitSet bitset = new BitSet(dnNum); + for (DatanodeInfo storageInfo : storageInfos) { + bitset.set(dnMap.get(storageInfo)); + } + + int[] toDead = new int[toRecoverBlockNum]; + int n = 0; + for (int i = 0; i < indices.length; i++) { + if (n < toRecoverBlockNum) { + if (recovery == 0) { + if (indices[i] >= dataBlkNum) { + toDead[n++] = i; + } + } else if (recovery == 1) { + if (indices[i] < dataBlkNum) { + toDead[n++] = i; + } + } else { + toDead[n++] = i; + } + } else { + break; + } + } + + DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum]; + int[] deadDnIndices = new int[toRecoverBlockNum]; + ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum]; + File[] replicas = new File[toRecoverBlockNum]; + File[] metadatas = new File[toRecoverBlockNum]; + byte[][] replicaContents = new byte[toRecoverBlockNum][]; + for (int i = 0; i < toRecoverBlockNum; i++) { + dataDNs[i] = storageInfos[toDead[i]]; + deadDnIndices[i] = dnMap.get(dataDNs[i]); + + // Check the block replica file on deadDn before it dead. + blocks[i] = StripedBlockUtil.constructStripedBlock( + lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]); + replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]); + metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]); + // the block replica on the datanode should be the same as expected + assertEquals(replicas[i].length(), + StripedBlockUtil.getStripedBlockLength( + lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]])); + assertTrue(metadatas[i].getName(). + endsWith(blocks[i].getGenerationStamp() + ".meta")); + replicaContents[i] = readReplica(replicas[i]); + } + + try { + DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum]; + for (int i = 0; i < toRecoverBlockNum; i++) { + /* + * Kill the datanode which contains one replica + * We need to make sure it dead in namenode: clear its update time and + * trigger NN to check heartbeat. + */ + DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]); + dn.shutdown(); + dnIDs[i] = dn.getDatanodeId(); + } + setDataNodesDead(dnIDs); + + + // Check the locatedBlocks of the file again + locatedBlocks = getLocatedBlocks(file); + lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + storageInfos = lastBlock.getLocations(); + assertEquals(storageInfos.length, groupSize - toRecoverBlockNum); + + int[] targetDNs = new int[dnNum - groupSize]; + n = 0; + for (int i = 0; i < dnNum; i++) { + if (!bitset.get(i)) { // not contain replica of the block. + targetDNs[n++] = i; + } + } + + waitForRecoveryFinished(file); + + targetDNs = sortTargetsByReplicas(blocks, targetDNs); + + // Check the replica on the new target node. + for (int i = 0; i < toRecoverBlockNum; i++) { + File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]); + File metadataAfterRecovery = + cluster.getBlockMetadataFile(targetDNs[i], blocks[i]); + assertEquals(replicaAfterRecovery.length(), replicas[i].length()); + assertTrue(metadataAfterRecovery.getName(). + endsWith(blocks[i].getGenerationStamp() + ".meta")); + byte[] replicaContentAfterRecovery = readReplica(replicaAfterRecovery); + + Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery); + } + } finally { + for (int i = 0; i < toRecoverBlockNum; i++) { + restartDataNode(toDead[i]); + } + cluster.waitActive(); + } + fs.delete(file, true); + } + + private void setDataNodesDead(DatanodeID[] dnIDs) throws IOException { + for (DatanodeID dn : dnIDs) { + DatanodeDescriptor dnd = + NameNodeAdapter.getDatanode(cluster.getNamesystem(), dn); + DFSTestUtil.setDatanodeDead(dnd); + } + + BlockManagerTestUtil.checkHeartbeat(cluster.getNamesystem().getBlockManager()); + } + + private void restartDataNode(int dn) { + try { + cluster.restartDataNode(dn, true, true); + } catch (IOException e) { + } + } + + private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) { + int[] result = new int[blocks.length]; + for (int i = 0; i < blocks.length; i++) { + result[i] = -1; + for (int j = 0; j < targetDNs.length; j++) { + if (targetDNs[j] != -1) { + File replica = cluster.getBlockFile(targetDNs[j], blocks[i]); + if (replica != null) { + result[i] = targetDNs[j]; + targetDNs[j] = -1; + break; + } + } + } + if (result[i] == -1) { + Assert.fail("Failed to recover striped block: " + blocks[i].getBlockId()); + } + } + return result; + } + + private byte[] readReplica(File replica) throws IOException { + int length = (int)replica.length(); + ByteArrayOutputStream content = new ByteArrayOutputStream(length); + FileInputStream in = new FileInputStream(replica); + try { + byte[] buffer = new byte[1024]; + int total = 0; + while (total < length) { + int n = in.read(buffer); + if (n <= 0) { + break; + } + content.write(buffer, 0, n); + total += n; + } + if (total < length) { + Assert.fail("Failed to read all content of replica"); + } + return content.toByteArray(); + } finally { + in.close(); + } + } + + private LocatedBlocks waitForRecoveryFinished(Path file) throws Exception { + final int ATTEMPTS = 60; + for (int i = 0; i < ATTEMPTS; i++) { + LocatedBlocks locatedBlocks = getLocatedBlocks(file); + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + DatanodeInfo[] storageInfos = lastBlock.getLocations(); + if (storageInfos.length >= groupSize) { + return locatedBlocks; + } + Thread.sleep(1000); + } + throw new IOException ("Time out waiting for EC block recovery."); + } + + private LocatedBlocks getLocatedBlocks(Path file) throws IOException { + return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE); + } + + private void testCreateStripedFile(Path file, int dataLen) + throws IOException { + final byte[] data = new byte[dataLen]; + DFSUtil.getRandom().nextBytes(data); + writeContents(file, data); + } + + void writeContents(Path file, byte[] contents) + throws IOException { + FSDataOutputStream out = fs.create(file); + try { + out.write(contents, 0, contents.length); + } finally { + out.close(); + } + } +}