HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. Contributed by Yi Liu.

This commit is contained in:
Zhe Zhang 2015-05-05 16:33:56 -07:00 committed by Zhe Zhang
parent 220ca960bc
commit 6616de24cb
14 changed files with 1377 additions and 55 deletions

View File

@ -175,3 +175,6 @@
HDFS-7672. Handle write failure for stripping blocks and refactor the
existing code in DFSStripedOutputStream and StripedDataStreamer. (szetszwo)
HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks.
(Yi Liu via Zhe Zhang)

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.util.DataChecksum;
/**
* A BlockReader is responsible for reading a single block
@ -99,4 +100,9 @@ public interface BlockReader extends ByteBufferReadable {
* supported.
*/
ClientMmap getClientMmap(EnumSet<ReadOption> opts);
/**
* @return The DataChecksum used by the read block
*/
DataChecksum getDataChecksum();
}

View File

@ -738,4 +738,9 @@ class BlockReaderLocal implements BlockReader {
void forceUnanchorable() {
replica.getSlot().makeUnanchorable();
}
@Override
public DataChecksum getDataChecksum() {
return checksum;
}
}

View File

@ -732,4 +732,9 @@ class BlockReaderLocalLegacy implements BlockReader {
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
@Override
public DataChecksum getDataChecksum() {
return checksum;
}
}

View File

@ -369,6 +369,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads";
public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20;
public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 256 * 1024;
public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis";
public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

View File

@ -37,7 +37,7 @@ import org.apache.htrace.Span;
****************************************************************/
@InterfaceAudience.Private
class DFSPacket {
public class DFSPacket {
public static final long HEART_BEAT_SEQNO = -1L;
private static long[] EMPTY = new long[0];
private final long seqno; // sequence number of buffer in block
@ -80,7 +80,7 @@ class DFSPacket {
* @param checksumSize the size of checksum
* @param lastPacketInBlock if this is the last packet
*/
DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
int checksumSize, boolean lastPacketInBlock) {
this.lastPacketInBlock = lastPacketInBlock;
this.numChunks = 0;
@ -114,7 +114,7 @@ class DFSPacket {
dataPos += len;
}
synchronized void writeData(ByteBuffer inBuffer, int len)
public synchronized void writeData(ByteBuffer inBuffer, int len)
throws ClosedChannelException {
checkBuffer();
len = len > inBuffer.remaining() ? inBuffer.remaining() : len;
@ -135,7 +135,7 @@ class DFSPacket {
* @param len the length of checksums to write
* @throws ClosedChannelException
*/
synchronized void writeChecksum(byte[] inarray, int off, int len)
public synchronized void writeChecksum(byte[] inarray, int off, int len)
throws ClosedChannelException {
checkBuffer();
if (len == 0) {
@ -154,7 +154,7 @@ class DFSPacket {
* @param stm
* @throws IOException
*/
synchronized void writeTo(DataOutputStream stm) throws IOException {
public synchronized void writeTo(DataOutputStream stm) throws IOException {
checkBuffer();
final int dataLen = dataPos - dataStart;

View File

@ -505,4 +505,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
@Override
public DataChecksum getDataChecksum() {
return checksum;
}
}

View File

@ -474,4 +474,9 @@ public class RemoteBlockReader2 implements BlockReader {
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
@Override
public DataChecksum getDataChecksum() {
return checksum;
}
}

View File

@ -235,6 +235,33 @@ public class DNConf {
return maxLockedMemory;
}
/**
* Returns true if connect to datanode via hostname
*
* @return boolean true if connect to datanode via hostname
*/
public boolean getConnectToDnViaHostname() {
return connectToDnViaHostname;
}
/**
* Returns socket timeout
*
* @return int socket timeout
*/
public int getSocketTimeout() {
return socketTimeout;
}
/**
* Returns socket write timeout
*
* @return int socket write timeout
*/
public int getSocketWriteTimeout() {
return socketWriteTimeout;
}
/**
* Returns the SaslPropertiesResolver configured for use with
* DataTransferProtocol, or null if not configured.

View File

@ -1161,7 +1161,8 @@ public class DataNode extends ReconfigurableBase
saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
ecWorker = new ErasureCodingWorker(conf); // Initialize ErasureCoding worker
// Initialize ErasureCoding worker
ecWorker = new ErasureCodingWorker(conf, this);
}
/**
@ -1226,6 +1227,10 @@ public class DataNode extends ReconfigurableBase
return UUID.randomUUID().toString();
}
public SaslDataTransferClient getSaslClient() {
return saslClient;
}
/**
* Verify that the DatanodeUuid has been initialized. If this is a new
* datanode then we generate a new Datanode Uuid and persist it to disk.
@ -1488,7 +1493,7 @@ public class DataNode extends ReconfigurableBase
/**
* Creates either NIO or regular depending on socketWriteTimeout.
*/
protected Socket newSocket() throws IOException {
public Socket newSocket() throws IOException {
return (dnConf.socketWriteTimeout > 0) ?
SocketChannel.open().socket() : new Socket();
}
@ -2143,11 +2148,8 @@ public class DataNode extends ReconfigurableBase
//
// Header info
//
Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
if (isBlockTokenEnabled) {
accessToken = blockPoolTokenSecretManager.generateToken(b,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
}
Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
@ -2214,6 +2216,19 @@ public class DataNode extends ReconfigurableBase
}
}
/***
* Use BlockTokenSecretManager to generate block token for current user.
*/
public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
EnumSet<AccessMode> mode) throws IOException {
Token<BlockTokenIdentifier> accessToken =
BlockTokenSecretManager.DUMMY_TOKEN;
if (isBlockTokenEnabled) {
accessToken = blockPoolTokenSecretManager.generateToken(b, mode);
}
return accessToken;
}
/**
* Returns a new DataEncryptionKeyFactory that generates a key from the
* BlockPoolTokenSecretManager, using the block pool ID of the given block.
@ -2221,7 +2236,7 @@ public class DataNode extends ReconfigurableBase
* @param block for which the factory needs to create a key
* @return DataEncryptionKeyFactory for block's block pool ID
*/
DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
final ExtendedBlock block) {
return new DataEncryptionKeyFactory() {
@Override

View File

@ -17,15 +17,68 @@
*/
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSPacket;
import org.apache.hadoop.hdfs.RemoteBlockReader2;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.io.erasurecode.coder.AbstractErasureCoder;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripedReadResult;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
/**
* ErasureCodingWorker handles the erasure coding recovery work commands. These
@ -34,41 +87,60 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
* commands.
*/
public final class ErasureCodingWorker {
private final Log LOG = DataNode.LOG;
private final DataNode datanode;
private Configuration conf;
RawErasureCoder rawEncoder = null;
RawErasureCoder rawDecoder = null;
public ErasureCodingWorker(Configuration conf) {
private ThreadPoolExecutor STRIPED_READ_TRHEAD_POOL;
private final int STRIPED_READ_THRESHOLD_MILLIS;
private final int STRIPED_READ_BUFFER_SIZE;
public ErasureCodingWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
this.conf = conf;
initialize();
STRIPED_READ_THRESHOLD_MILLIS = conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT);
initializeStripedReadThreadPool(conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT));
STRIPED_READ_BUFFER_SIZE = conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
}
/**
* Initializes the required resources for handling the erasure coding recovery
* work.
*/
public void initialize() {
// Right now directly used RS coder. Once other coders integration ready, we
// can load preferred codec here.
initializeErasureEncoder();
initializeErasureDecoder();
private RawErasureEncoder newEncoder() {
return new RSRawEncoder();
}
private RawErasureDecoder newDecoder() {
return new RSRawDecoder();
}
private void initializeErasureDecoder() {
rawDecoder = AbstractErasureCoder.createRawCoder(conf,
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, false);
if (rawDecoder == null) {
rawDecoder = new RSRawDecoder();
}
}
private void initializeStripedReadThreadPool(int num) {
STRIPED_READ_TRHEAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
private void initializeErasureEncoder() {
rawEncoder = AbstractErasureCoder.createRawCoder(conf,
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, true);
if (rawEncoder == null) {
rawEncoder = new RSRawEncoder();
}
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("stripedRead-" + threadIndex.getAndIncrement());
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
LOG.info("Execution for striped reading rejected, "
+ "Executing in current thread");
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
STRIPED_READ_TRHEAD_POOL.allowCoreThreadTimeOut(true);
}
/**
@ -78,6 +150,765 @@ public final class ErasureCodingWorker {
* BlockECRecoveryInfo
*/
public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
// HDFS-7348 : Implement the actual recovery process
for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
try {
new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start();
} catch (Throwable e) {
LOG.warn("Failed to recover striped block " +
recoveryInfo.getExtendedBlock().getLocalBlock(), e);
}
}
}
/**
* ReconstructAndTransferBlock recover one or more missed striped block in the
* striped block group, the minimum number of live striped blocks should be
* no less than data block number.
*
* | <- Striped Block Group -> |
* blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group
* | | | |
* v v v v
* +------+ +------+ +------+ +------+
* |cell_0| |cell_1| |cell_2| |cell_3| ...
* +------+ +------+ +------+ +------+
* |cell_4| |cell_5| |cell_6| |cell_7| ...
* +------+ +------+ +------+ +------+
* |cell_8| |cell_9| |cell10| |cell11| ...
* +------+ +------+ +------+ +------+
* ... ... ... ...
*
*
* We use following steps to recover striped block group, in each round, we
* recover <code>bufferSize</code> data until finish, the
* <code>bufferSize</code> is configurable and may be less or larger than
* cell size:
* step1: read <code>bufferSize</code> data from minimum number of sources
* required by recovery.
* step2: decode data for targets.
* step3: transfer data to targets.
*
* In step1, try to read <code>bufferSize</code> data from minimum number
* of sources , if there is corrupt or stale sources, read from new source
* will be scheduled. The best sources are remembered for next round and
* may be updated in each round.
*
* In step2, typically if source blocks we read are all data blocks, we
* need to call encode, and if there is one parity block, we need to call
* decode. Notice we only read once and recover all missed striped block
* if they are more than one.
*
* In step3, send the recovered data to targets by constructing packet
* and send them directly. Same as continuous block replication, we
* don't check the packet ack. Since the datanode doing the recovery work
* are one of the source datanodes, so the recovered data are sent
* remotely.
*
* There are some points we can do further improvements in next phase:
* 1. we can read the block file directly on the local datanode,
* currently we use remote block reader. (Notice short-circuit is not
* a good choice, see inline comments).
* 2. We need to check the packet ack for EC recovery? Since EC recovery
* is more expensive than continuous block replication, it needs to
* read from several other datanodes, should we make sure the
* recovered result received by targets?
*/
private class ReconstructAndTransferBlock implements Runnable {
private final int dataBlkNum;
private final int parityBlkNum;
private final int cellSize;
private RawErasureEncoder encoder;
private RawErasureDecoder decoder;
// Striped read buffer size
private int bufferSize;
private final ExtendedBlock blockGroup;
// position in striped block
private long positionInBlock;
// sources
private final short[] liveIndices;
private DatanodeInfo[] sources;
private List<StripedReader> stripedReaders;
// targets
private DatanodeInfo[] targets;
private StorageType[] targetStorageTypes;
private short[] targetIndices;
private ByteBuffer[] targetBuffers;
private Socket[] targetSockets;
private DataOutputStream[] targetOutputStreams;
private DataInputStream[] targetInputStreams;
private long[] blockOffset4Targets;
private long[] seqNo4Targets;
private final int WRITE_PACKET_SIZE = 64 * 1024;
private DataChecksum checksum;
private int maxChunksPerPacket;
private byte[] packetBuf;
private byte[] checksumBuf;
private int bytesPerChecksum;
private int checksumSize;
private CachingStrategy cachingStrategy;
private Map<Future<Void>, Integer> futures = new HashMap<>();
private CompletionService<Void> readService =
new ExecutorCompletionService<>(STRIPED_READ_TRHEAD_POOL);
ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
ECSchema schema = recoveryInfo.getECSchema();
dataBlkNum = schema.getNumDataUnits();
parityBlkNum = schema.getNumParityUnits();
cellSize = schema.getChunkSize();
blockGroup = recoveryInfo.getExtendedBlock();
liveIndices = recoveryInfo.getLiveBlockIndices();
sources = recoveryInfo.getSourceDnInfos();
stripedReaders = new ArrayList<>(sources.length);
Preconditions.checkArgument(liveIndices.length >= dataBlkNum,
"No enough live striped blocks.");
Preconditions.checkArgument(liveIndices.length == sources.length);
targets = recoveryInfo.getTargetDnInfos();
targetStorageTypes = recoveryInfo.getTargetStorageTypes();
targetIndices = new short[targets.length];
targetBuffers = new ByteBuffer[targets.length];
targetSockets = new Socket[targets.length];
targetOutputStreams = new DataOutputStream[targets.length];
targetInputStreams = new DataInputStream[targets.length];
blockOffset4Targets = new long[targets.length];
seqNo4Targets = new long[targets.length];
for (int i = 0; i < targets.length; i++) {
blockOffset4Targets[i] = 0;
seqNo4Targets[i] = 0;
}
getTargetIndices();
cachingStrategy = CachingStrategy.newDefaultStrategy();
}
private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
return StripedBlockUtil.constructStripedBlock(blockGroup, cellSize,
dataBlkNum, i);
}
private long getBlockLen(ExtendedBlock blockGroup, int i) {
return StripedBlockUtil.getStripedBlockLength(blockGroup.getNumBytes(),
cellSize, dataBlkNum, i);
}
@Override
public void run() {
try {
// Store the indices of successfully read source
// This will be updated after doing real read.
int[] success = new int[dataBlkNum];
int nsuccess = 0;
for (int i = 0; i < sources.length && nsuccess < dataBlkNum; i++) {
StripedReader reader = new StripedReader(liveIndices[i]);
stripedReaders.add(reader);
BlockReader blockReader = newBlockReader(
getBlock(blockGroup, liveIndices[i]), 0, sources[i]);
if (blockReader != null) {
initChecksumAndBufferSizeIfNeeded(blockReader);
reader.blockReader = blockReader;
reader.buffer = ByteBuffer.allocate(bufferSize);
success[nsuccess++] = i;
}
}
if (nsuccess < dataBlkNum) {
String error = "Can't find minimum sources required by "
+ "recovery, block id: " + blockGroup.getBlockId();
LOG.warn(error);
throw new IOException(error);
}
for (int i = 0; i < targets.length; i++) {
targetBuffers[i] = ByteBuffer.allocate(bufferSize);
}
checksumSize = checksum.getChecksumSize();
int chunkSize = bytesPerChecksum + checksumSize;
maxChunksPerPacket = Math.max(
(WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1);
int maxPacketSize = chunkSize * maxChunksPerPacket
+ PacketHeader.PKT_MAX_HEADER_LEN;
packetBuf = new byte[maxPacketSize];
checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
// Store whether the target is success
boolean[] targetsStatus = new boolean[targets.length];
if (initTargetStreams(targetsStatus) == 0) {
String error = "All targets are failed.";
LOG.warn(error);
throw new IOException(error);
}
long firstStripedBlockLength = getBlockLen(blockGroup, 0);
while (positionInBlock < firstStripedBlockLength) {
int toRead = Math.min(
bufferSize, (int)(firstStripedBlockLength - positionInBlock));
// step1: read minimum striped buffer size data required by recovery.
nsuccess = readMinimumStripedData4Recovery(success);
if (nsuccess < dataBlkNum) {
String error = "Can't read data from minimum number of sources "
+ "required by recovery, block id: " + blockGroup.getBlockId();
LOG.warn(error);
throw new IOException(error);
}
// step2: encode/decode to recover targets
long remaining = firstStripedBlockLength - positionInBlock;
int toRecoverLen = remaining < bufferSize ?
(int)remaining : bufferSize;
recoverTargets(success, targetsStatus, toRecoverLen);
// step3: transfer data
if (transferData2Targets(targetsStatus) == 0) {
String error = "Transfer failed for all targets.";
LOG.warn(error);
throw new IOException(error);
}
clearBuffers();
positionInBlock += toRead;
}
endTargetBlocks(targetsStatus);
// Currently we don't check the acks for packets, this is similar as
// block replication.
} catch (Throwable e) {
LOG.warn("Failed to recover striped block: " + blockGroup);
} finally {
// close block readers
for (StripedReader stripedReader : stripedReaders) {
closeBlockReader(stripedReader.blockReader);
}
for (int i = 0; i < targets.length; i++) {
IOUtils.closeStream(targetOutputStreams[i]);
IOUtils.closeStream(targetInputStreams[i]);
IOUtils.closeStream(targetSockets[i]);
}
}
}
// init checksum from block reader
private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) {
if (checksum == null) {
checksum = blockReader.getDataChecksum();
bytesPerChecksum = checksum.getBytesPerChecksum();
// The bufferSize is flat to divide bytesPerChecksum
int readBufferSize = STRIPED_READ_BUFFER_SIZE;
bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
readBufferSize - readBufferSize % bytesPerChecksum;
} else {
assert blockReader.getDataChecksum().equals(checksum);
}
}
// assume liveIndices is not ordered.
private void getTargetIndices() {
BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
for (int i = 0; i < sources.length; i++) {
bitset.set(liveIndices[i]);
}
int m = 0;
for (int i = 0; i < dataBlkNum + parityBlkNum && m < targets.length; i++) {
if (!bitset.get(i)) {
targetIndices[m++] = (short)i;
}
}
}
/**
* Read minimum striped buffer size data required by recovery.
* <code>success</code> list will be updated after read.
*
* Initially we only read from <code>dataBlkNum</code> sources,
* if timeout or failure for some source, we will try to schedule
* read from a new source.
*/
private int readMinimumStripedData4Recovery(int[] success) {
BitSet used = new BitSet(sources.length);
for (int i = 0; i < dataBlkNum; i++) {
StripedReader reader = stripedReaders.get(success[i]);
Callable<Void> readCallable = readFromBlock(
reader.blockReader, reader.buffer);
Future<Void> f = readService.submit(readCallable);
futures.put(f, success[i]);
used.set(success[i]);
}
int nsuccess = 0;
while (!futures.isEmpty()) {
try {
StripedReadResult result =
StripedBlockUtil.getNextCompletedStripedRead(
readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
if (result.state == StripedReadResult.SUCCESSFUL) {
success[nsuccess++] = result.index;
if (nsuccess >= dataBlkNum) {
// cancel remaining reads if we read successfully from minimum
// number of sources required for recovery.
cancelReads(futures.keySet());
futures.clear();
break;
}
} else if (result.state == StripedReadResult.FAILED) {
// If read failed for some source, we should not use it anymore
// and schedule read from a new source.
StripedReader failedReader = stripedReaders.get(result.index);
closeBlockReader(failedReader.blockReader);
failedReader.blockReader = null;
scheduleNewRead(used);
} else if (result.state == StripedReadResult.TIMEOUT) {
// If timeout, we also schedule a new read.
scheduleNewRead(used);
}
} catch (InterruptedException e) {
LOG.info("Read data interrupted.", e);
break;
}
}
return nsuccess;
}
/**
* Return true if need to do encoding to recovery missed striped block.
*/
private boolean shouldEncode(int[] success) {
for (int i = 0; i < success.length; i++) {
if (stripedReaders.get(success[i]).index >= dataBlkNum) {
return false;
}
}
return true;
}
private void paddingBufferToLen(ByteBuffer buffer, int len) {
int toPadding = len - buffer.position();
for (int i = 0; i < toPadding; i++) {
buffer.put((byte) 0);
}
}
// Initialize encoder
private void initEncoderIfNecessary() {
if (encoder == null) {
encoder = newEncoder();
encoder.initialize(dataBlkNum, parityBlkNum, bufferSize);
}
}
// Initialize decoder
private void initDecoderIfNecessary() {
if (decoder == null) {
decoder = newDecoder();
decoder.initialize(dataBlkNum, parityBlkNum, bufferSize);
}
}
private void recoverTargets(int[] success, boolean[] targetsStatus,
int toRecoverLen) {
if (shouldEncode(success)) {
initEncoderIfNecessary();
ByteBuffer[] dataBuffers = new ByteBuffer[dataBlkNum];
ByteBuffer[] parityBuffers = new ByteBuffer[parityBlkNum];
for (int i = 0; i < dataBlkNum; i++) {
StripedReader reader = stripedReaders.get(i);
ByteBuffer buffer = reader.buffer;
paddingBufferToLen(buffer, toRecoverLen);
dataBuffers[i] = (ByteBuffer)buffer.flip();
}
for (int i = dataBlkNum; i < stripedReaders.size(); i++) {
StripedReader reader = stripedReaders.get(i);
parityBuffers[reader.index - dataBlkNum] = cleanBuffer(reader.buffer);
}
for (int i = 0; i < targets.length; i++) {
parityBuffers[targetIndices[i] - dataBlkNum] = targetBuffers[i];
}
for (int i = 0; i < parityBlkNum; i++) {
if (parityBuffers[i] == null) {
parityBuffers[i] = ByteBuffer.allocate(toRecoverLen);
} else {
parityBuffers[i].limit(toRecoverLen);
}
}
encoder.encode(dataBuffers, parityBuffers);
} else {
/////////// TODO: wait for HADOOP-11847 /////////////
////////// The current decode method always try to decode parityBlkNum number of data blocks. ////////////
initDecoderIfNecessary();
ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
for (int i = 0; i < success.length; i++) {
StripedReader reader = stripedReaders.get(success[i]);
ByteBuffer buffer = reader.buffer;
paddingBufferToLen(buffer, toRecoverLen);
int index = reader.index < dataBlkNum ?
reader.index + parityBlkNum : reader.index - dataBlkNum;
inputs[index] = (ByteBuffer)buffer.flip();
}
int[] indices4Decode = new int[parityBlkNum];
int m = 0;
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
if (inputs[i] == null) {
inputs[i] = ByteBuffer.allocate(toRecoverLen);
indices4Decode[m++] = i;
}
}
ByteBuffer[] outputs = new ByteBuffer[parityBlkNum];
m = 0;
// targetIndices is subset of indices4Decode
for (int i = 0; i < parityBlkNum; i++) {
if (m < targetIndices.length &&
(indices4Decode[i] - parityBlkNum) == targetIndices[m]) {
outputs[i] = targetBuffers[m++];
outputs[i].limit(toRecoverLen);
} else {
outputs[i] = ByteBuffer.allocate(toRecoverLen);
}
}
decoder.decode(inputs, indices4Decode, outputs);
for (int i = 0; i < targets.length; i++) {
if (targetsStatus[i]) {
long blockLen = getBlockLen(blockGroup, targetIndices[i]);
long remaining = blockLen - positionInBlock;
if (remaining < 0) {
targetBuffers[i].limit(0);
} else if (remaining < toRecoverLen) {
targetBuffers[i].limit((int)remaining);
}
}
}
}
}
/**
* Schedule read from a new source, we first try un-initial source,
* then try un-used source in this round and bypass failed source.
*/
private void scheduleNewRead(BitSet used) {
StripedReader reader = null;
int m = stripedReaders.size();
while (m < sources.length && reader == null) {
reader = new StripedReader(liveIndices[m]);
BlockReader blockReader = newBlockReader(
getBlock(blockGroup, liveIndices[m]), positionInBlock, sources[m]);
stripedReaders.add(reader);
if (blockReader != null) {
assert blockReader.getDataChecksum().equals(checksum);
reader.blockReader = blockReader;
reader.buffer = ByteBuffer.allocate(bufferSize);
} else {
m++;
reader = null;
}
}
for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
StripedReader r = stripedReaders.get(i);
if (r.blockReader != null && !used.get(i)) {
closeBlockReader(r.blockReader);
r.blockReader = newBlockReader(
getBlock(blockGroup, liveIndices[i]), positionInBlock,
sources[i]);
if (r.blockReader != null) {
m = i;
reader = r;
}
}
}
if (reader != null) {
Callable<Void> readCallable = readFromBlock(
reader.blockReader, reader.buffer);
Future<Void> f = readService.submit(readCallable);
futures.put(f, m);
used.set(m);
}
}
// cancel all reads.
private void cancelReads(Collection<Future<Void>> futures) {
for (Future<Void> future : futures) {
future.cancel(true);
}
}
private Callable<Void> readFromBlock(final BlockReader reader,
final ByteBuffer buf) {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
actualReadFromBlock(reader, buf);
return null;
} catch (IOException e) {
LOG.info(e.getMessage());
throw e;
}
}
};
}
/**
* Read bytes from block
*/
private void actualReadFromBlock(BlockReader reader, ByteBuffer buf)
throws IOException {
int len = buf.remaining();
int n = 0;
while (n < len) {
int nread = reader.read(buf);
if (nread <= 0) {
break;
}
n += nread;
}
}
// close block reader
private void closeBlockReader(BlockReader blockReader) {
try {
if (blockReader != null) {
blockReader.close();
}
} catch (IOException e) {
// ignore
}
}
private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
return NetUtils.createSocketAddr(dnInfo.getXferAddr(
datanode.getDnConf().getConnectToDnViaHostname()));
}
private BlockReader newBlockReader(final ExtendedBlock block,
long startOffset, DatanodeInfo dnInfo) {
try {
InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
/*
* This can be further improved if the replica is local, then we can
* read directly from DN and need to check the replica is FINALIZED
* state, notice we should not use short-circuit local read which
* requires config for domain-socket in UNIX or legacy config in Windows.
*/
return RemoteBlockReader2.newBlockReader(
"dummy", block, blockToken, startOffset, block.getNumBytes(), true,
"", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
null, cachingStrategy);
} catch (IOException e) {
return null;
}
}
private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
final int socketTimeout = datanode.getDnConf().getSocketTimeout();
try {
sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
NetUtils.connect(sock, addr, socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(),
sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
blockToken, datanodeId);
peer.setReadTimeout(socketTimeout);
success = true;
return peer;
} finally {
if (!success) {
IOUtils.cleanup(LOG, peer);
IOUtils.closeSocket(sock);
}
}
}
/**
* Send data to targets
*/
private int transferData2Targets(boolean[] targetsStatus) {
int nsuccess = 0;
for (int i = 0; i < targets.length; i++) {
if (targetsStatus[i]) {
boolean success = false;
try {
ByteBuffer buffer = targetBuffers[i];
if (buffer.remaining() == 0) {
continue;
}
checksum.calculateChunkedSums(
buffer.array(), 0, buffer.remaining(), checksumBuf, 0);
int ckOff = 0;
while (buffer.remaining() > 0) {
DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket,
blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false);
int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum;
int toWrite = buffer.remaining() > maxBytesToPacket ?
maxBytesToPacket : buffer.remaining();
int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize;
packet.writeChecksum(checksumBuf, ckOff, ckLen);
ckOff += ckLen;
packet.writeData(buffer, toWrite);
// Send packet
packet.writeTo(targetOutputStreams[i]);
blockOffset4Targets[i] += toWrite;
nsuccess++;
success = true;
}
} catch (IOException e) {
LOG.warn(e.getMessage());
}
targetsStatus[i] = success;
}
}
return nsuccess;
}
/**
* clear all buffers
*/
private void clearBuffers() {
for (StripedReader stripedReader : stripedReaders) {
if (stripedReader.buffer != null) {
stripedReader.buffer.clear();
}
}
for (int i = 0; i < targetBuffers.length; i++) {
if (targetBuffers[i] != null) {
cleanBuffer(targetBuffers[i]);
}
}
}
private ByteBuffer cleanBuffer(ByteBuffer buffer) {
Arrays.fill(buffer.array(), (byte) 0);
return (ByteBuffer)buffer.clear();
}
// send an empty packet to mark the end of the block
private void endTargetBlocks(boolean[] targetsStatus) {
for (int i = 0; i < targets.length; i++) {
if (targetsStatus[i]) {
try {
DFSPacket packet = new DFSPacket(packetBuf, 0,
blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true);
packet.writeTo(targetOutputStreams[i]);
targetOutputStreams[i].flush();
} catch (IOException e) {
LOG.warn(e.getMessage());
}
}
}
}
/**
* Initialize output/input streams for transferring data to target
* and send create block request.
*/
private int initTargetStreams(boolean[] targetsStatus) {
int nsuccess = 0;
for (int i = 0; i < targets.length; i++) {
Socket socket = null;
DataOutputStream out = null;
DataInputStream in = null;
boolean success = false;
try {
InetSocketAddress targetAddr =
getSocketAddress4Transfer(targets[i]);
socket = datanode.newSocket();
NetUtils.connect(socket, targetAddr,
datanode.getDnConf().getSocketTimeout());
socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
ExtendedBlock block = getBlock(blockGroup, targetIndices[i]);
Token<BlockTokenIdentifier> blockToken =
datanode.getBlockAccessToken(block,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(socket);
DataEncryptionKeyFactory keyFactory =
datanode.getDataEncryptionKeyFactoryForBlock(block);
IOStreamPair saslStreams = datanode.getSaslClient().socketSend(
socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsServerConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId());
new Sender(out).writeBlock(block, targetStorageTypes[i],
blockToken, "", new DatanodeInfo[]{targets[i]},
new StorageType[]{targetStorageTypes[i]}, source,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0,
checksum, cachingStrategy, false, false, null);
targetSockets[i] = socket;
targetOutputStreams[i] = out;
targetInputStreams[i] = in;
nsuccess++;
success = true;
} catch (Throwable e) {
LOG.warn(e.getMessage());
} finally {
if (!success) {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeStream(socket);
}
}
targetsStatus[i] = success;
}
return nsuccess;
}
}
private class StripedReader {
short index;
BlockReader blockReader;
ByteBuffer buffer;
public StripedReader(short index) {
this.index = index;
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.util;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -77,10 +78,8 @@ public class StripedBlockUtil {
public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
int idxInReturnedLocs, int cellSize, int dataBlkNum,
int idxInBlockGroup) {
final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
blk.setNumBytes(getInternalBlockLength(bg.getBlockSize(),
cellSize, dataBlkNum, idxInBlockGroup));
final ExtendedBlock blk = constructInternalBlock(
bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
return new LocatedBlock(blk,
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
@ -90,6 +89,44 @@ public class StripedBlockUtil {
null);
}
/**
* This method creates an internal {@link ExtendedBlock} at the given index
* of a block group.
*/
public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup,
int cellSize, int dataBlkNum, int idxInBlockGroup) {
ExtendedBlock block = new ExtendedBlock(blockGroup);
block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
block.setNumBytes(getInternalBlockLength(blockGroup.getNumBytes(),
cellSize, dataBlkNum, idxInBlockGroup));
return block;
}
/**
* This method creates an internal {@link ExtendedBlock} at the given index
* of a block group, for both data and parity block.
*/
public static ExtendedBlock constructStripedBlock(ExtendedBlock blockGroup,
int cellSize, int dataBlkNum, int idxInBlockGroup) {
ExtendedBlock block = new ExtendedBlock(blockGroup);
block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
block.setNumBytes(getStripedBlockLength(blockGroup.getNumBytes(), cellSize,
dataBlkNum, idxInBlockGroup));
return block;
}
/**
* Returns an internal block length at the given index of a block group,
* for both data and parity block.
*/
public static long getStripedBlockLength(long numBytes, int cellSize,
int dataBlkNum, int idxInBlockGroup) {
// parity block length is the same as the first striped block length.
return StripedBlockUtil.getInternalBlockLength(
numBytes, cellSize, dataBlkNum,
idxInBlockGroup < dataBlkNum ? idxInBlockGroup : 0);
}
/**
* Get the size of an internal block at the given index of a block group
*
@ -208,8 +245,8 @@ public class StripedBlockUtil {
* @throws InterruptedException
*/
public static StripedReadResult getNextCompletedStripedRead(
CompletionService<Void> readService, Map<Future<Void>,
Integer> futures, final long threshold) throws InterruptedException {
CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
final long threshold) throws InterruptedException {
Preconditions.checkArgument(!futures.isEmpty());
Preconditions.checkArgument(threshold > 0);
Future<Void> future = null;

View File

@ -2312,11 +2312,11 @@
</description>
</property>
<property>
<name>dfs.datanode.block-pinning.enabled</name>
<value>false</value>
<description>Whether pin blocks on favored DataNode.</description>
</property>
<property>
<name>dfs.datanode.block-pinning.enabled</name>
<value>false</value>
<description>Whether pin blocks on favored DataNode.</description>
</property>
<property>
<name>dfs.client.block.write.locateFollowingBlock.initial.delay.ms</name>
@ -2354,4 +2354,25 @@
</description>
</property>
<property>
<name>dfs.datanode.stripedread.threshold.millis</name>
<value>5000</value>
<description>datanode striped read threshold in millisecond.
</description>
</property>
<property>
<name>dfs.datanode.stripedread.threads</name>
<value>20</value>
<description>datanode striped read thread pool size.
</description>
</property>
<property>
<name>dfs.datanode.stripedread.buffer.size</name>
<value>262144</value>
<description>datanode striped read buffer size.
</description>
</property>
</configuration>

View File

@ -0,0 +1,356 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TestRecoverStripedFile {
public static final Log LOG = LogFactory.getLog(TestRecoverStripedFile.class);
private static final int dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
private static final int parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;
private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private static final int blockSize = cellSize * 3;
private static final int groupSize = dataBlkNum + parityBlkNum;
private static final int dnNum = groupSize + parityBlkNum;
private MiniDFSCluster cluster;
private Configuration conf;
private DistributedFileSystem fs;
// Map: DatanodeID -> datanode index in cluster
private Map<DatanodeID, Integer> dnMap = new HashMap<DatanodeID, Integer>();
@Before
public void setup() throws IOException {
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();;
cluster.waitActive();
fs = cluster.getFileSystem();
fs.getClient().createErasureCodingZone("/", null);
List<DataNode> datanodes = cluster.getDataNodes();
for (int i = 0; i < dnNum; i++) {
dnMap.put(datanodes.get(i).getDatanodeId(), i);
}
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test(timeout = 120000)
public void testRecoverOneParityBlock() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1);
}
@Test(timeout = 120000)
public void testRecoverThreeParityBlocks() throws Exception {
int fileLen = 3 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3);
}
@Test(timeout = 120000)
public void testRecoverThreeDataBlocks() throws Exception {
int fileLen = 3 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3);
}
@Test(timeout = 120000)
public void testRecoverOneDataBlock() throws Exception {
////TODO: TODO: wait for HADOOP-11847
//int fileLen = 10 * blockSize + blockSize/10;
//assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1);
}
@Test(timeout = 120000)
public void testRecoverAnyBlocks() throws Exception {
////TODO: TODO: wait for HADOOP-11847
//int fileLen = 3 * blockSize + blockSize/10;
//assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2);
}
/**
* Test the file blocks recovery.
* 1. Check the replica is recovered in the target datanode,
* and verify the block replica length, generationStamp and content.
* 2. Read the file and verify content.
*/
private void assertFileBlocksRecovery(String fileName, int fileLen,
int recovery, int toRecoverBlockNum) throws Exception {
if (recovery != 0 && recovery != 1 && recovery != 2) {
Assert.fail("Invalid recovery: 0 is to recovery parity blocks,"
+ "1 is to recovery data blocks, 2 is any.");
}
if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
}
Path file = new Path(fileName);
testCreateStripedFile(file, fileLen);
LocatedBlocks locatedBlocks = getLocatedBlocks(file);
assertEquals(locatedBlocks.getFileLength(), fileLen);
LocatedStripedBlock lastBlock =
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
DatanodeInfo[] storageInfos = lastBlock.getLocations();
int[] indices = lastBlock.getBlockIndices();
BitSet bitset = new BitSet(dnNum);
for (DatanodeInfo storageInfo : storageInfos) {
bitset.set(dnMap.get(storageInfo));
}
int[] toDead = new int[toRecoverBlockNum];
int n = 0;
for (int i = 0; i < indices.length; i++) {
if (n < toRecoverBlockNum) {
if (recovery == 0) {
if (indices[i] >= dataBlkNum) {
toDead[n++] = i;
}
} else if (recovery == 1) {
if (indices[i] < dataBlkNum) {
toDead[n++] = i;
}
} else {
toDead[n++] = i;
}
} else {
break;
}
}
DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum];
int[] deadDnIndices = new int[toRecoverBlockNum];
ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum];
File[] replicas = new File[toRecoverBlockNum];
File[] metadatas = new File[toRecoverBlockNum];
byte[][] replicaContents = new byte[toRecoverBlockNum][];
for (int i = 0; i < toRecoverBlockNum; i++) {
dataDNs[i] = storageInfos[toDead[i]];
deadDnIndices[i] = dnMap.get(dataDNs[i]);
// Check the block replica file on deadDn before it dead.
blocks[i] = StripedBlockUtil.constructStripedBlock(
lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]);
replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
// the block replica on the datanode should be the same as expected
assertEquals(replicas[i].length(),
StripedBlockUtil.getStripedBlockLength(
lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]]));
assertTrue(metadatas[i].getName().
endsWith(blocks[i].getGenerationStamp() + ".meta"));
replicaContents[i] = readReplica(replicas[i]);
}
try {
DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum];
for (int i = 0; i < toRecoverBlockNum; i++) {
/*
* Kill the datanode which contains one replica
* We need to make sure it dead in namenode: clear its update time and
* trigger NN to check heartbeat.
*/
DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]);
dn.shutdown();
dnIDs[i] = dn.getDatanodeId();
}
setDataNodesDead(dnIDs);
// Check the locatedBlocks of the file again
locatedBlocks = getLocatedBlocks(file);
lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
storageInfos = lastBlock.getLocations();
assertEquals(storageInfos.length, groupSize - toRecoverBlockNum);
int[] targetDNs = new int[dnNum - groupSize];
n = 0;
for (int i = 0; i < dnNum; i++) {
if (!bitset.get(i)) { // not contain replica of the block.
targetDNs[n++] = i;
}
}
waitForRecoveryFinished(file);
targetDNs = sortTargetsByReplicas(blocks, targetDNs);
// Check the replica on the new target node.
for (int i = 0; i < toRecoverBlockNum; i++) {
File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]);
File metadataAfterRecovery =
cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
assertEquals(replicaAfterRecovery.length(), replicas[i].length());
assertTrue(metadataAfterRecovery.getName().
endsWith(blocks[i].getGenerationStamp() + ".meta"));
byte[] replicaContentAfterRecovery = readReplica(replicaAfterRecovery);
Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery);
}
} finally {
for (int i = 0; i < toRecoverBlockNum; i++) {
restartDataNode(toDead[i]);
}
cluster.waitActive();
}
fs.delete(file, true);
}
private void setDataNodesDead(DatanodeID[] dnIDs) throws IOException {
for (DatanodeID dn : dnIDs) {
DatanodeDescriptor dnd =
NameNodeAdapter.getDatanode(cluster.getNamesystem(), dn);
DFSTestUtil.setDatanodeDead(dnd);
}
BlockManagerTestUtil.checkHeartbeat(cluster.getNamesystem().getBlockManager());
}
private void restartDataNode(int dn) {
try {
cluster.restartDataNode(dn, true, true);
} catch (IOException e) {
}
}
private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) {
int[] result = new int[blocks.length];
for (int i = 0; i < blocks.length; i++) {
result[i] = -1;
for (int j = 0; j < targetDNs.length; j++) {
if (targetDNs[j] != -1) {
File replica = cluster.getBlockFile(targetDNs[j], blocks[i]);
if (replica != null) {
result[i] = targetDNs[j];
targetDNs[j] = -1;
break;
}
}
}
if (result[i] == -1) {
Assert.fail("Failed to recover striped block: " + blocks[i].getBlockId());
}
}
return result;
}
private byte[] readReplica(File replica) throws IOException {
int length = (int)replica.length();
ByteArrayOutputStream content = new ByteArrayOutputStream(length);
FileInputStream in = new FileInputStream(replica);
try {
byte[] buffer = new byte[1024];
int total = 0;
while (total < length) {
int n = in.read(buffer);
if (n <= 0) {
break;
}
content.write(buffer, 0, n);
total += n;
}
if (total < length) {
Assert.fail("Failed to read all content of replica");
}
return content.toByteArray();
} finally {
in.close();
}
}
private LocatedBlocks waitForRecoveryFinished(Path file) throws Exception {
final int ATTEMPTS = 60;
for (int i = 0; i < ATTEMPTS; i++) {
LocatedBlocks locatedBlocks = getLocatedBlocks(file);
LocatedStripedBlock lastBlock =
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
DatanodeInfo[] storageInfos = lastBlock.getLocations();
if (storageInfos.length >= groupSize) {
return locatedBlocks;
}
Thread.sleep(1000);
}
throw new IOException ("Time out waiting for EC block recovery.");
}
private LocatedBlocks getLocatedBlocks(Path file) throws IOException {
return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE);
}
private void testCreateStripedFile(Path file, int dataLen)
throws IOException {
final byte[] data = new byte[dataLen];
DFSUtil.getRandom().nextBytes(data);
writeContents(file, data);
}
void writeContents(Path file, byte[] contents)
throws IOException {
FSDataOutputStream out = fs.create(file);
try {
out.write(contents, 0, contents.length);
} finally {
out.close();
}
}
}