diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 3de60b24d44..3c91ca10b54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -32,7 +32,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -909,7 +908,8 @@ public class DFSInputStream extends FSInputStream } } - protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { + protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, + int len) throws IOException { dfsClient.checkOpen(); if (closed.get()) { throw new IOException("Stream closed"); @@ -959,7 +959,7 @@ public class DFSInputStream extends FSInputStream // Check if need to report block replicas corruption either read // was successful or ChecksumException occured. reportCheckSumFailure(corruptedBlockMap, - currentLocatedBlock.getLocations().length); + currentLocatedBlock.getLocations().length, false); } } } @@ -1492,7 +1492,8 @@ public class DFSInputStream extends FSInputStream // Check and report if any block replicas are corrupted. // BlockMissingException may be caught if all block replicas are // corrupted. - reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length); + reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length, + false); } remaining -= bytesToRead; @@ -1508,6 +1509,7 @@ public class DFSInputStream extends FSInputStream /** * DFSInputStream reports checksum failure. + * For replicated blocks, we have the following logic: * Case I : client has tried multiple data nodes and at least one of the * attempts has succeeded. We report the other failures as corrupted block to * namenode. @@ -1515,29 +1517,39 @@ public class DFSInputStream extends FSInputStream * only report if the total number of replica is 1. We do not * report otherwise since this maybe due to the client is a handicapped client * (who can not read). + * + * For erasure-coded blocks, each block in corruptedBlockMap is an internal + * block in a block group, and there is usually only one DataNode + * corresponding to each internal block. For this case we simply report the + * corrupted blocks to NameNode and ignore the above logic. + * * @param corruptedBlockMap map of corrupted blocks * @param dataNodeCount number of data nodes who contains the block replicas */ protected void reportCheckSumFailure( Map> corruptedBlockMap, - int dataNodeCount) { + int dataNodeCount, boolean isStriped) { if (corruptedBlockMap.isEmpty()) { return; } - Iterator>> it = corruptedBlockMap - .entrySet().iterator(); - Entry> entry = it.next(); - ExtendedBlock blk = entry.getKey(); - Set dnSet = entry.getValue(); - if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0)) - || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) { - DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()]; - int i = 0; - for (DatanodeInfo dn:dnSet) { - locs[i++] = dn; + List reportList = new ArrayList<>(corruptedBlockMap.size()); + for (Map.Entry> entry : + corruptedBlockMap.entrySet()) { + ExtendedBlock blk = entry.getKey(); + Set dnSet = entry.getValue(); + if (isStriped || ((dnSet.size() < dataNodeCount) && (dnSet.size() > 0)) + || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) { + DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()]; + int i = 0; + for (DatanodeInfo dn:dnSet) { + locs[i++] = dn; + } + reportList.add(new LocatedBlock(blk, locs)); } - LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) }; - dfsClient.reportChecksumFailure(src, lblocks); + } + if (reportList.size() > 0) { + dfsClient.reportChecksumFailure(src, + reportList.toArray(new LocatedBlock[reportList.size()])); } corruptedBlockMap.clear(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 743325677bc..d15e536a501 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -451,7 +451,7 @@ public class DFSStripedInputStream extends DFSInputStream { // Check if need to report block replicas corruption either read // was successful or ChecksumException occured. reportCheckSumFailure(corruptedBlockMap, - currentLocatedBlock.getLocations().length); + currentLocatedBlock.getLocations().length, true); } } return -1; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index e8653c8a08b..dbd53a3ea0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSStripedOutputStream; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -35,6 +34,8 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.*; @@ -72,6 +73,8 @@ import java.util.concurrent.TimeUnit; @InterfaceAudience.Private public class StripedBlockUtil { + public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class); + /** * This method parses a striped block group into individual blocks. * @@ -221,15 +224,11 @@ public class StripedBlockUtil { return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT); } } catch (ExecutionException e) { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Exception during striped read task", e); - } + LOG.debug("Exception during striped read task", e); return new StripingChunkReadResult(futures.remove(future), StripingChunkReadResult.FAILED); } catch (CancellationException e) { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Exception during striped read task", e); - } + LOG.debug("Exception during striped read task", e); return new StripingChunkReadResult(futures.remove(future), StripingChunkReadResult.CANCELLED); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 43adf62e260..940fa906396 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -406,6 +406,9 @@ Trunk (Unreleased) HDFS-9615. Fix variable name typo in DFSConfigKeys. (Ray Chiang via Arpit Agarwal) + HDFS-9646. ErasureCodingWorker may fail when recovering data blocks with + length less than the first internal block. (jing9) + BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS HDFS-7347. Configurable erasure coding policy for individual files and diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 5588eec8cab..6ad71648586 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -32,8 +32,10 @@ import java.util.BitSet; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; @@ -46,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -121,9 +124,8 @@ public final class ErasureCodingWorker { } private void initializeStripedReadThreadPool(int num) { - if (LOG.isDebugEnabled()) { - LOG.debug("Using striped reads; pool threads=" + num); - } + LOG.debug("Using striped reads; pool threads=" + num); + STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, TimeUnit.SECONDS, new SynchronousQueue(), new Daemon.DaemonFactory() { @@ -148,9 +150,7 @@ public final class ErasureCodingWorker { } private void initializeStripedBlkRecoveryThreadPool(int num) { - if (LOG.isDebugEnabled()) { - LOG.debug("Using striped block recovery; pool threads=" + num); - } + LOG.debug("Using striped block recovery; pool threads=" + num); STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), new Daemon.DaemonFactory() { @@ -368,11 +368,11 @@ public final class ErasureCodingWorker { * @return StripedReader */ private StripedReader addStripedReader(int i, long offsetInBlock) { - StripedReader reader = new StripedReader(liveIndices[i]); + final ExtendedBlock block = getBlock(blockGroup, liveIndices[i]); + StripedReader reader = new StripedReader(liveIndices[i], block, sources[i]); stripedReaders.add(reader); - BlockReader blockReader = newBlockReader( - getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]); + BlockReader blockReader = newBlockReader(block, offsetInBlock, sources[i]); if (blockReader != null) { initChecksumAndBufferSizeIfNeeded(blockReader); reader.blockReader = blockReader; @@ -435,19 +435,27 @@ public final class ErasureCodingWorker { throw new IOException(error); } - long firstStripedBlockLength = getBlockLen(blockGroup, 0); - while (positionInBlock < firstStripedBlockLength) { - int toRead = Math.min( - bufferSize, (int)(firstStripedBlockLength - positionInBlock)); + long maxTargetLength = 0; + for (short targetIndex : targetIndices) { + maxTargetLength = Math.max(maxTargetLength, + getBlockLen(blockGroup, targetIndex)); + } + while (positionInBlock < maxTargetLength) { + final int toRecover = (int) Math.min( + bufferSize, maxTargetLength - positionInBlock); // step1: read from minimum source DNs required for reconstruction. - // The returned success list is the source DNs we do real read from - success = readMinimumStripedData4Recovery(success); + // The returned success list is the source DNs we do real read from + Map> corruptionMap = new HashMap<>(); + try { + success = readMinimumStripedData4Recovery(success, toRecover, + corruptionMap); + } finally { + // report corrupted blocks to NN + reportCorruptedBlocks(corruptionMap); + } // step2: decode to reconstruct targets - long remaining = firstStripedBlockLength - positionInBlock; - int toRecoverLen = remaining < bufferSize ? - (int)remaining : bufferSize; - recoverTargets(success, targetsStatus, toRecoverLen); + recoverTargets(success, targetsStatus, toRecover); // step3: transfer data if (transferData2Targets(targetsStatus) == 0) { @@ -456,7 +464,7 @@ public final class ErasureCodingWorker { } clearBuffers(); - positionInBlock += toRead; + positionInBlock += toRecover; } endTargetBlocks(targetsStatus); @@ -513,10 +521,11 @@ public final class ErasureCodingWorker { } } - private long getReadLength(int index) { + /** the reading length should not exceed the length for recovery */ + private int getReadLength(int index, int recoverLength) { long blockLen = getBlockLen(blockGroup, index); long remaining = blockLen - positionInBlock; - return remaining > bufferSize ? bufferSize : remaining; + return (int) Math.min(remaining, recoverLength); } /** @@ -529,11 +538,15 @@ public final class ErasureCodingWorker { * operations and next iteration read. * * @param success the initial success list of source DNs we think best + * @param recoverLength the length to recover. * @return updated success list of source DNs we do real read * @throws IOException */ - private int[] readMinimumStripedData4Recovery(final int[] success) + private int[] readMinimumStripedData4Recovery(final int[] success, + int recoverLength, Map> corruptionMap) throws IOException { + Preconditions.checkArgument(recoverLength >= 0 && + recoverLength <= bufferSize); int nsuccess = 0; int[] newSuccess = new int[minRequiredSources]; BitSet used = new BitSet(sources.length); @@ -543,9 +556,11 @@ public final class ErasureCodingWorker { */ for (int i = 0; i < minRequiredSources; i++) { StripedReader reader = stripedReaders.get(success[i]); - if (getReadLength(liveIndices[success[i]]) > 0) { - Callable readCallable = readFromBlock( - reader.blockReader, reader.buffer); + final int toRead = getReadLength(liveIndices[success[i]], + recoverLength); + if (toRead > 0) { + Callable readCallable = readFromBlock(reader, reader.buffer, + toRead, corruptionMap); Future f = readService.submit(readCallable); futures.put(f, success[i]); } else { @@ -570,10 +585,10 @@ public final class ErasureCodingWorker { StripedReader failedReader = stripedReaders.get(result.index); closeBlockReader(failedReader.blockReader); failedReader.blockReader = null; - resultIndex = scheduleNewRead(used); + resultIndex = scheduleNewRead(used, recoverLength, corruptionMap); } else if (result.state == StripingChunkReadResult.TIMEOUT) { // If timeout, we also schedule a new read. - resultIndex = scheduleNewRead(used); + resultIndex = scheduleNewRead(used, recoverLength, corruptionMap); } if (resultIndex >= 0) { newSuccess[nsuccess++] = resultIndex; @@ -601,6 +616,9 @@ public final class ErasureCodingWorker { } private void paddingBufferToLen(ByteBuffer buffer, int len) { + if (len > buffer.limit()) { + buffer.limit(len); + } int toPadding = len - buffer.position(); for (int i = 0; i < toPadding; i++) { buffer.put((byte) 0); @@ -648,8 +666,8 @@ public final class ErasureCodingWorker { int m = 0; for (int i = 0; i < targetBuffers.length; i++) { if (targetsStatus[i]) { + targetBuffers[i].limit(toRecoverLen); outputs[m++] = targetBuffers[i]; - outputs[i].limit(toRecoverLen); } } decoder.decode(inputs, erasedIndices, outputs); @@ -658,7 +676,7 @@ public final class ErasureCodingWorker { if (targetsStatus[i]) { long blockLen = getBlockLen(blockGroup, targetIndices[i]); long remaining = blockLen - positionInBlock; - if (remaining < 0) { + if (remaining <= 0) { targetBuffers[i].limit(0); } else if (remaining < toRecoverLen) { targetBuffers[i].limit((int)remaining); @@ -678,16 +696,19 @@ public final class ErasureCodingWorker { * @param used the used source DNs in this iteration. * @return the array index of source DN if don't need to do real read. */ - private int scheduleNewRead(BitSet used) { + private int scheduleNewRead(BitSet used, int recoverLength, + Map> corruptionMap) { StripedReader reader = null; // step1: initially we may only have minRequiredSources // number of StripedReader, and there may be some source DNs we never // read before, so will try to create StripedReader for one new source DN // and try to read from it. If found, go to step 3. int m = stripedReaders.size(); + int toRead = 0; while (reader == null && m < sources.length) { reader = addStripedReader(m, positionInBlock); - if (getReadLength(liveIndices[m]) > 0) { + toRead = getReadLength(liveIndices[m], recoverLength); + if (toRead > 0) { if (reader.blockReader == null) { reader = null; m++; @@ -706,12 +727,14 @@ public final class ErasureCodingWorker { for (int i = 0; reader == null && i < stripedReaders.size(); i++) { if (!used.get(i)) { StripedReader r = stripedReaders.get(i); - if (getReadLength(liveIndices[i]) > 0) { + toRead = getReadLength(liveIndices[i], recoverLength); + if (toRead > 0) { closeBlockReader(r.blockReader); r.blockReader = newBlockReader( getBlock(blockGroup, liveIndices[i]), positionInBlock, sources[i]); if (r.blockReader != null) { + r.buffer.position(0); m = i; reader = r; } @@ -725,8 +748,8 @@ public final class ErasureCodingWorker { // step3: schedule if find a correct source DN and need to do real read. if (reader != null) { - Callable readCallable = readFromBlock( - reader.blockReader, reader.buffer); + Callable readCallable = readFromBlock(reader, reader.buffer, + toRead, corruptionMap); Future f = readService.submit(readCallable); futures.put(f, m); used.set(m); @@ -742,15 +765,22 @@ public final class ErasureCodingWorker { } } - private Callable readFromBlock(final BlockReader reader, - final ByteBuffer buf) { + private Callable readFromBlock(final StripedReader reader, + final ByteBuffer buf, final int length, + final Map> corruptionMap) { return new Callable() { @Override public Void call() throws Exception { try { - actualReadFromBlock(reader, buf); + buf.limit(length); + actualReadFromBlock(reader.blockReader, buf); return null; + } catch (ChecksumException e) { + LOG.warn("Found Checksum error for " + reader.block + " from " + + reader.source + " at " + e.getPos()); + addCorruptedBlock(reader.block, reader.source, corruptionMap); + throw e; } catch (IOException e) { LOG.info(e.getMessage()); throw e; @@ -760,6 +790,30 @@ public final class ErasureCodingWorker { }; } + private void reportCorruptedBlocks( + Map> corruptionMap) throws IOException { + if (!corruptionMap.isEmpty()) { + for (Map.Entry> entry : + corruptionMap.entrySet()) { + for (DatanodeInfo dnInfo : entry.getValue()) { + datanode.reportRemoteBadBlock(dnInfo, entry.getKey()); + } + } + } + } + + private void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node, + Map> corruptionMap) { + Set dnSet = corruptionMap.get(blk); + if (dnSet == null) { + dnSet = new HashSet<>(); + corruptionMap.put(blk, dnSet); + } + if (!dnSet.contains(node)) { + dnSet.add(node); + } + } + /** * Read bytes from block */ @@ -900,14 +954,14 @@ public final class ErasureCodingWorker { } if (zeroStripeBuffers != null) { - for (int i = 0; i < zeroStripeBuffers.length; i++) { - zeroStripeBuffers[i].clear(); + for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) { + zeroStripeBuffer.clear(); } } - for (int i = 0; i < targetBuffers.length; i++) { - if (targetBuffers[i] != null) { - targetBuffers[i].clear(); + for (ByteBuffer targetBuffer : targetBuffers) { + if (targetBuffer != null) { + targetBuffer.clear(); } } } @@ -998,9 +1052,13 @@ public final class ErasureCodingWorker { private final short index; // internal block index private BlockReader blockReader; private ByteBuffer buffer; + private final ExtendedBlock block; + private final DatanodeInfo source; - private StripedReader(short index) { + StripedReader(short index, ExtendedBlock block, DatanodeInfo source) { this.index = index; + this.block = block; + this.source = source; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java index 1cb74b31f35..d0c1786dbe5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java @@ -136,7 +136,7 @@ public class BlockECRecoveryCommand extends DatanodeCommand { .append("Recovering ").append(block).append(" From: ") .append(Arrays.asList(sources)).append(" To: [") .append(Arrays.asList(targets)).append(")\n") - .append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)) + .append(" Block Indices: ").append(Arrays.toString(liveBlockIndices)) .toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java index 32b0216d739..b0af50ee052 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; @@ -29,12 +30,16 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -51,6 +56,14 @@ import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; public class TestReadStripedFileWithDecoding { static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class); + static { + ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)) + .getLogger().setLevel(Level.ALL); + GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL); + GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL); + GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL); + } + private MiniDFSCluster cluster; private DistributedFileSystem fs; private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS; @@ -66,9 +79,9 @@ public class TestReadStripedFileWithDecoding { Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 0); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); - cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()) - .numDataNodes(numDNs).build(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); fs = cluster.getFileSystem(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java index 21352b55ad6..ca9d933fa95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java @@ -23,11 +23,12 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -64,17 +66,25 @@ public class TestRecoverStripedFile { static { GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); + GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL); + GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL); + } + + enum RecoveryType { + DataOnly, + ParityOnly, + Any } private MiniDFSCluster cluster; - private Configuration conf; private DistributedFileSystem fs; // Map: DatanodeID -> datanode index in cluster - private Map dnMap = new HashMap(); + private Map dnMap = new HashMap<>(); + private final Random random = new Random(); @Before public void setup() throws IOException { - conf = new Configuration(); + final Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1); @@ -104,75 +114,140 @@ public class TestRecoverStripedFile { @Test(timeout = 120000) public void testRecoverOneParityBlock() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1); + assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, + RecoveryType.ParityOnly, 1); } @Test(timeout = 120000) public void testRecoverOneParityBlock1() throws Exception { int fileLen = cellSize + cellSize/10; - assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, 0, 1); + assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, + RecoveryType.ParityOnly, 1); } @Test(timeout = 120000) public void testRecoverOneParityBlock2() throws Exception { int fileLen = 1; - assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, 0, 1); + assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, + RecoveryType.ParityOnly, 1); } @Test(timeout = 120000) public void testRecoverOneParityBlock3() throws Exception { int fileLen = 3 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, 0, 1); + assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, + RecoveryType.ParityOnly, 1); } @Test(timeout = 120000) public void testRecoverThreeParityBlocks() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3); + assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, + RecoveryType.ParityOnly, 3); } @Test(timeout = 120000) public void testRecoverThreeDataBlocks() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3); + assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, + RecoveryType.DataOnly, 3); } @Test(timeout = 120000) public void testRecoverThreeDataBlocks1() throws Exception { int fileLen = 3 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, 1, 3); + assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, + RecoveryType.DataOnly, 3); } @Test(timeout = 120000) public void testRecoverOneDataBlock() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1); + assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, + RecoveryType.DataOnly, 1); } @Test(timeout = 120000) public void testRecoverOneDataBlock1() throws Exception { int fileLen = cellSize + cellSize/10; - assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, 1, 1); + assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, + RecoveryType.DataOnly, 1); } @Test(timeout = 120000) public void testRecoverOneDataBlock2() throws Exception { int fileLen = 1; - assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, 1, 1); + assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, + RecoveryType.DataOnly, 1); } @Test(timeout = 120000) public void testRecoverAnyBlocks() throws Exception { int fileLen = 3 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2); + assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, + RecoveryType.Any, 2); } @Test(timeout = 120000) public void testRecoverAnyBlocks1() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, 2, 3); + assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, + RecoveryType.Any, 3); } - + + private int[] generateDeadDnIndices(RecoveryType type, int deadNum, + byte[] indices) { + List deadList = new ArrayList<>(deadNum); + while (deadList.size() < deadNum) { + int dead = random.nextInt(indices.length); + boolean isOfType = true; + if (type == RecoveryType.DataOnly) { + isOfType = indices[dead] < dataBlkNum; + } else if (type == RecoveryType.ParityOnly) { + isOfType = indices[dead] >= dataBlkNum; + } + if (isOfType && !deadList.contains(dead)) { + deadList.add(dead); + } + } + int[] d = new int[deadNum]; + for (int i = 0; i < deadNum; i++) { + d[i] = deadList.get(i); + } + return d; + } + + private void shutdownDataNodes(DataNode dn) throws IOException { + /* + * Kill the datanode which contains one replica + * We need to make sure it dead in namenode: clear its update time and + * trigger NN to check heartbeat. + */ + dn.shutdown(); + cluster.setDataNodeDead(dn.getDatanodeId()); + } + + private int generateErrors(Map corruptTargets, + RecoveryType type) + throws IOException { + int stoppedDN = 0; + for (Map.Entry target : corruptTargets.entrySet()) { + if (stoppedDN == 0 || type != RecoveryType.DataOnly + || random.nextBoolean()) { + // stop at least one DN to trigger recovery + LOG.info("Note: stop DataNode " + target.getValue().getDisplayName() + + " with internal block " + target.getKey()); + shutdownDataNodes(target.getValue()); + stoppedDN++; + } else { // corrupt the data on the DN + LOG.info("Note: corrupt data on " + target.getValue().getDisplayName() + + " with internal block " + target.getKey()); + cluster.corruptReplica(target.getValue(), target.getKey()); + } + } + return stoppedDN; + } + /** * Test the file blocks recovery. * 1. Check the replica is recovered in the target datanode, @@ -180,11 +255,7 @@ public class TestRecoverStripedFile { * 2. Read the file and verify content. */ private void assertFileBlocksRecovery(String fileName, int fileLen, - int recovery, int toRecoverBlockNum) throws Exception { - if (recovery != 0 && recovery != 1 && recovery != 2) { - Assert.fail("Invalid recovery: 0 is to recovery parity blocks," - + "1 is to recovery data blocks, 2 is any."); - } + RecoveryType type, int toRecoverBlockNum) throws Exception { if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) { Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum); } @@ -192,7 +263,7 @@ public class TestRecoverStripedFile { Path file = new Path(fileName); final byte[] data = new byte[fileLen]; - ThreadLocalRandom.current().nextBytes(data); + Arrays.fill(data, (byte) 1); DFSTestUtil.writeFile(fs, file, data); StripedFileTestUtil.waitBlockGroupsReported(fs, fileName); @@ -209,26 +280,10 @@ public class TestRecoverStripedFile { for (DatanodeInfo storageInfo : storageInfos) { bitset.set(dnMap.get(storageInfo)); } - - int[] toDead = new int[toRecoverBlockNum]; - int n = 0; - for (int i = 0; i < indices.length; i++) { - if (n < toRecoverBlockNum) { - if (recovery == 0) { - if (indices[i] >= dataBlkNum) { - toDead[n++] = i; - } - } else if (recovery == 1) { - if (indices[i] < dataBlkNum) { - toDead[n++] = i; - } - } else { - toDead[n++] = i; - } - } else { - break; - } - } + + int[] dead = generateDeadDnIndices(type, toRecoverBlockNum, indices); + LOG.info("Note: indices == " + Arrays.toString(indices) + + ". Generate errors on datanodes: " + Arrays.toString(dead)); DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum]; int[] deadDnIndices = new int[toRecoverBlockNum]; @@ -236,46 +291,41 @@ public class TestRecoverStripedFile { File[] replicas = new File[toRecoverBlockNum]; File[] metadatas = new File[toRecoverBlockNum]; byte[][] replicaContents = new byte[toRecoverBlockNum][]; + Map errorMap = new HashMap<>(dead.length); for (int i = 0; i < toRecoverBlockNum; i++) { - dataDNs[i] = storageInfos[toDead[i]]; + dataDNs[i] = storageInfos[dead[i]]; deadDnIndices[i] = dnMap.get(dataDNs[i]); - + // Check the block replica file on deadDn before it dead. blocks[i] = StripedBlockUtil.constructInternalBlock( - lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]); + lastBlock.getBlock(), cellSize, dataBlkNum, indices[dead[i]]); + errorMap.put(blocks[i], cluster.getDataNodes().get(deadDnIndices[i])); replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]); metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]); // the block replica on the datanode should be the same as expected assertEquals(replicas[i].length(), StripedBlockUtil.getInternalBlockLength( - lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]])); + lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[dead[i]])); assertTrue(metadatas[i].getName(). endsWith(blocks[i].getGenerationStamp() + ".meta")); + LOG.info("replica " + i + " locates in file: " + replicas[i]); replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]); } int cellsNum = (fileLen - 1) / cellSize + 1; int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum; - for (int i = 0; i < toRecoverBlockNum; i++) { - /* - * Kill the datanode which contains one replica - * We need to make sure it dead in namenode: clear its update time and - * trigger NN to check heartbeat. - */ - DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]); - dn.shutdown(); - cluster.setDataNodeDead(dn.getDatanodeId()); - } + // shutdown datanodes or generate corruption + int stoppedDN = generateErrors(errorMap, type); // Check the locatedBlocks of the file again locatedBlocks = getLocatedBlocks(file); lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); storageInfos = lastBlock.getLocations(); - assertEquals(storageInfos.length, groupSize - toRecoverBlockNum); + assertEquals(storageInfos.length, groupSize - stoppedDN); int[] targetDNs = new int[dnNum - groupSize]; - n = 0; + int n = 0; for (int i = 0; i < dnNum; i++) { if (!bitset.get(i)) { // not contain replica of the block. targetDNs[n++] = i; @@ -289,9 +339,11 @@ public class TestRecoverStripedFile { // Check the replica on the new target node. for (int i = 0; i < toRecoverBlockNum; i++) { File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]); + LOG.info("replica after recovery " + replicaAfterRecovery); File metadataAfterRecovery = cluster.getBlockMetadataFile(targetDNs[i], blocks[i]); assertEquals(replicaAfterRecovery.length(), replicas[i].length()); + LOG.info("replica before " + replicas[i]); assertTrue(metadataAfterRecovery.getName(). endsWith(blocks[i].getGenerationStamp() + ".meta")); byte[] replicaContentAfterRecovery = @@ -366,7 +418,7 @@ public class TestRecoverStripedFile { BlockECRecoveryInfo invalidECInfo = new BlockECRecoveryInfo( new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices, ErasureCodingPolicyManager.getSystemDefaultPolicy()); - List ecTasks = new ArrayList(); + List ecTasks = new ArrayList<>(); ecTasks.add(invalidECInfo); dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks); }