HDFS-9113. ErasureCodingWorker#processErasureCodingTasks should not fail to process remaining tasks due to one invalid ECTask. Contributed by Uma Maheswara Rao G.
This commit is contained in:
parent
c457095206
commit
b762199adb
|
@ -438,3 +438,6 @@
|
||||||
|
|
||||||
HDFS-8550. Erasure Coding: Fix FindBugs Multithreaded correctness Warning.
|
HDFS-8550. Erasure Coding: Fix FindBugs Multithreaded correctness Warning.
|
||||||
(Rakesh R via zhz)
|
(Rakesh R via zhz)
|
||||||
|
|
||||||
|
HDFS-9113. ErasureCodingWorker#processErasureCodingTasks should not fail to process
|
||||||
|
remaining tasks due to one invalid ECTask (umamahesh)
|
||||||
|
|
|
@ -175,8 +175,13 @@ public final class ErasureCodingWorker {
|
||||||
*/
|
*/
|
||||||
public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
|
public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
|
||||||
for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
|
for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
|
||||||
STRIPED_BLK_RECOVERY_THREAD_POOL.submit(new ReconstructAndTransferBlock(
|
try {
|
||||||
recoveryInfo));
|
STRIPED_BLK_RECOVERY_THREAD_POOL
|
||||||
|
.submit(new ReconstructAndTransferBlock(recoveryInfo));
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("Failed to recover striped block "
|
||||||
|
+ recoveryInfo.getExtendedBlock().getLocalBlock(), e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,10 +17,23 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.BitSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -28,30 +41,20 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.BitSet;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
|
||||||
|
|
||||||
public class TestRecoverStripedFile {
|
public class TestRecoverStripedFile {
|
||||||
public static final Log LOG = LogFactory.getLog(TestRecoverStripedFile.class);
|
public static final Log LOG = LogFactory.getLog(TestRecoverStripedFile.class);
|
||||||
|
|
||||||
|
@ -383,4 +386,33 @@ public class TestRecoverStripedFile {
|
||||||
private LocatedBlocks getLocatedBlocks(Path file) throws IOException {
|
private LocatedBlocks getLocatedBlocks(Path file) throws IOException {
|
||||||
return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE);
|
return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Tests that processErasureCodingTasks should not throw exceptions out due to
|
||||||
|
* invalid ECTask submission.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testProcessErasureCodingTasksSubmitionShouldSucceed()
|
||||||
|
throws Exception {
|
||||||
|
DataNode dataNode = cluster.dataNodes.get(0).datanode;
|
||||||
|
|
||||||
|
// Pack invalid(dummy) parameters in ecTasks. Irrespective of parameters, each task
|
||||||
|
// thread pool submission should succeed, so that it will not prevent
|
||||||
|
// processing other tasks in the list if any exceptions.
|
||||||
|
int size = cluster.dataNodes.size();
|
||||||
|
short[] liveIndices = new short[size];
|
||||||
|
DatanodeInfo[] dataDNs = new DatanodeInfo[size + 1];
|
||||||
|
DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil
|
||||||
|
.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
|
||||||
|
new DatanodeStorage("s01"));
|
||||||
|
DatanodeStorageInfo[] dnStorageInfo = new DatanodeStorageInfo[] {
|
||||||
|
targetDnInfos_1 };
|
||||||
|
|
||||||
|
BlockECRecoveryInfo invalidECInfo = new BlockECRecoveryInfo(
|
||||||
|
new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices,
|
||||||
|
ErasureCodingPolicyManager.getSystemDefaultPolicy());
|
||||||
|
List<BlockECRecoveryInfo> ecTasks = new ArrayList<BlockECRecoveryInfo>();
|
||||||
|
ecTasks.add(invalidECInfo);
|
||||||
|
dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue