HDFS-12299. Race Between update pipeline and DN Re-Registration. Contributed by Brahma Reddy Battula.
This commit is contained in:
parent
2cae387402
commit
5a83ffa396
|
@ -1579,7 +1579,8 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** update pipeline at the namenode */
|
/** update pipeline at the namenode */
|
||||||
private void updatePipeline(long newGS) throws IOException {
|
@VisibleForTesting
|
||||||
|
void updatePipeline(long newGS) throws IOException {
|
||||||
final ExtendedBlock oldBlock = block.getCurrentBlock();
|
final ExtendedBlock oldBlock = block.getCurrentBlock();
|
||||||
// the new GS has been propagated to all DN, it should be ok to update the
|
// the new GS has been propagated to all DN, it should be ok to update the
|
||||||
// local block state
|
// local block state
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class BlockUnderConstructionFeature {
|
||||||
for(int i = 0; i < targets.length; i++) {
|
for(int i = 0; i < targets.length; i++) {
|
||||||
// Only store non-null DatanodeStorageInfo.
|
// Only store non-null DatanodeStorageInfo.
|
||||||
if (targets[i] != null) {
|
if (targets[i] != null) {
|
||||||
replicas[i] = new ReplicaUnderConstruction(block,
|
replicas[offset++] = new ReplicaUnderConstruction(block,
|
||||||
targets[i], ReplicaState.RBW);
|
targets[i], ReplicaState.RBW);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,9 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
|
@ -706,4 +709,51 @@ public class TestClientProtocolForPipelineRecovery {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdatePipeLineAfterDNReg()throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem fileSys = cluster.getFileSystem();
|
||||||
|
|
||||||
|
Path file = new Path("/testUpdatePipeLineAfterDNReg");
|
||||||
|
FSDataOutputStream out = fileSys.create(file);
|
||||||
|
out.write(1);
|
||||||
|
out.hflush();
|
||||||
|
//Get the First DN and disable the heartbeats and then put in Deadstate
|
||||||
|
DFSOutputStream dfsOut = (DFSOutputStream) out.getWrappedStream();
|
||||||
|
DatanodeInfo[] pipeline = dfsOut.getPipeline();
|
||||||
|
DataNode dn1 = cluster.getDataNode(pipeline[0].getIpcPort());
|
||||||
|
dn1.setHeartbeatsDisabledForTests(true);
|
||||||
|
DatanodeDescriptor dn1Desc = cluster.getNamesystem(0).getBlockManager()
|
||||||
|
.getDatanodeManager().getDatanode(dn1.getDatanodeId());
|
||||||
|
cluster.setDataNodeDead(dn1Desc);
|
||||||
|
//Re-register the DeadNode
|
||||||
|
DatanodeProtocolClientSideTranslatorPB dnp =
|
||||||
|
new DatanodeProtocolClientSideTranslatorPB(
|
||||||
|
cluster.getNameNode().getNameNodeAddress(), conf);
|
||||||
|
dnp.registerDatanode(
|
||||||
|
dn1.getDNRegistrationForBP(cluster.getNamesystem().getBlockPoolId()));
|
||||||
|
DFSOutputStream dfsO = (DFSOutputStream) out.getWrappedStream();
|
||||||
|
String clientName = ((DistributedFileSystem) fileSys).getClient()
|
||||||
|
.getClientName();
|
||||||
|
NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
||||||
|
//Update the genstamp and call updatepipeline
|
||||||
|
LocatedBlock newBlock = namenode
|
||||||
|
.updateBlockForPipeline(dfsO.getBlock(), clientName);
|
||||||
|
dfsO.getStreamer()
|
||||||
|
.updatePipeline(newBlock.getBlock().getGenerationStamp());
|
||||||
|
newBlock = namenode.updateBlockForPipeline(dfsO.getBlock(), clientName);
|
||||||
|
//Should not throw any error Pipeline should be success
|
||||||
|
dfsO.getStreamer()
|
||||||
|
.updatePipeline(newBlock.getBlock().getGenerationStamp());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue