HDFS-10810. setreplication removing block from underconstrcution temporarily. Contributed by Brahma Reddy Battula

This commit is contained in:
Mingliang Liu 2016-10-03 21:59:31 -04:00
parent d9b686a2bd
commit d963ecb918
2 changed files with 68 additions and 5 deletions

View File

@ -3588,13 +3588,15 @@ public class BlockManager implements BlockStatsMXBean {
return; return;
} }
NumberReplicas repl = countNodes(block); NumberReplicas repl = countNodes(block);
int pendingNum = pendingReplications.getNumReplicas(block);
int curExpectedReplicas = getReplication(block); int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, repl.liveReplicas())) { if (!hasEnoughEffectiveReplicas(block, repl, pendingNum,
neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(), curExpectedReplicas)) {
repl.decommissionedAndDecommissioning(), curExpectedReplicas, neededReplications.update(block, repl.liveReplicas() + pendingNum,
curReplicasDelta, expectedReplicasDelta); repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
} else { } else {
int oldReplicas = repl.liveReplicas()-curReplicasDelta; int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(), neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
repl.decommissionedAndDecommissioning(), oldExpectedReplicas); repl.decommissionedAndDecommissioning(), oldExpectedReplicas);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -38,6 +39,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
@ -230,6 +232,65 @@ public class TestFileCorruption {
} }
@Test
public void testSetReplicationWhenBatchIBR() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
30000);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
1);
DistributedFileSystem dfs;
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3).build()) {
final int bufferSize = 1024; // 1024 Bytes each time
byte[] outBuffer = new byte[bufferSize];
dfs = cluster.getFileSystem();
String fileName = "/testSetRep1";
Path filePath = new Path(fileName);
FSDataOutputStream out = dfs.create(filePath);
out.write(outBuffer, 0, bufferSize);
out.close();
//sending the FBR to Delay next IBR
cluster.triggerBlockReports();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
cluster.triggerBlockReports();
if (cluster.getNamesystem().getBlocksTotal() == 1) {
return true;
}
} catch (Exception e) {
// Ignore the exception
}
return false;
}
}, 10, 3000);
fileName = "/testSetRep2";
filePath = new Path(fileName);
out = dfs.create(filePath);
out.write(outBuffer, 0, bufferSize);
out.close();
dfs.setReplication(filePath, (short) 10);
// underreplicated Blocks should be one after setrep
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
try {
return cluster.getNamesystem().getBlockManager()
.getUnderReplicatedBlocksCount() == 1;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}, 10, 3000);
assertEquals(0,
cluster.getNamesystem().getBlockManager().getMissingBlocksCount());
}
}
private void markAllBlocksAsCorrupt(BlockManager bm, private void markAllBlocksAsCorrupt(BlockManager bm,
ExtendedBlock blk) throws IOException { ExtendedBlock blk) throws IOException {
for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) { for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) {