HDFS-8245. Standby namenode doesn't process DELETED_BLOCK if the addblock request is in edit log. Contributed by Rushabh S Shah.
(cherry picked from commit 2d4ae3d18b
)
This commit is contained in:
parent
bbb9f6aafe
commit
f57b1bfbdd
|
@ -73,6 +73,9 @@ Release 2.7.1 - UNRELEASED
|
||||||
HDFS-7894. Rolling upgrade readiness is not updated in jmx until query
|
HDFS-7894. Rolling upgrade readiness is not updated in jmx until query
|
||||||
command is issued. (Brahma Reddy Battula via kihwal)
|
command is issued. (Brahma Reddy Battula via kihwal)
|
||||||
|
|
||||||
|
HDFS-8254. Standby namenode doesn't process DELETED_BLOCK if the add block
|
||||||
|
request is in edit log. (Rushabh S Shah via kihwal)
|
||||||
|
|
||||||
Release 2.7.0 - 2015-04-20
|
Release 2.7.0 - 2015-04-20
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -2308,10 +2308,17 @@ public class BlockManager {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Processing previouly queued message " + rbi);
|
LOG.debug("Processing previouly queued message " + rbi);
|
||||||
}
|
}
|
||||||
|
if (rbi.getReportedState() == null) {
|
||||||
|
// This is a DELETE_BLOCK request
|
||||||
|
DatanodeStorageInfo storageInfo = rbi.getStorageInfo();
|
||||||
|
removeStoredBlock(rbi.getBlock(),
|
||||||
|
storageInfo.getDatanodeDescriptor());
|
||||||
|
} else {
|
||||||
processAndHandleReportedBlock(rbi.getStorageInfo(),
|
processAndHandleReportedBlock(rbi.getStorageInfo(),
|
||||||
rbi.getBlock(), rbi.getReportedState(), null);
|
rbi.getBlock(), rbi.getReportedState(), null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process any remaining queued datanode messages after entering
|
* Process any remaining queued datanode messages after entering
|
||||||
|
@ -3012,6 +3019,17 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeStoredBlock(DatanodeStorageInfo storageInfo, Block block,
|
||||||
|
DatanodeDescriptor node) {
|
||||||
|
if (shouldPostponeBlocksFromFuture &&
|
||||||
|
namesystem.isGenStampInFuture(block)) {
|
||||||
|
queueReportedBlock(storageInfo, block, null,
|
||||||
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
removeStoredBlock(block, node);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
|
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
|
||||||
* removed block is still valid.
|
* removed block is still valid.
|
||||||
|
@ -3189,7 +3207,7 @@ public class BlockManager {
|
||||||
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
|
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
|
||||||
switch (rdbi.getStatus()) {
|
switch (rdbi.getStatus()) {
|
||||||
case DELETED_BLOCK:
|
case DELETED_BLOCK:
|
||||||
removeStoredBlock(rdbi.getBlock(), node);
|
removeStoredBlock(storageInfo, rdbi.getBlock(), node);
|
||||||
deleted++;
|
deleted++;
|
||||||
break;
|
break;
|
||||||
case RECEIVED_BLOCK:
|
case RECEIVED_BLOCK:
|
||||||
|
|
|
@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
|
@ -52,8 +54,11 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -342,6 +347,98 @@ public class TestBlockReplacement {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Standby namenode doesn't queue Delete block request when the add block
|
||||||
|
* request is in the edit log which are yet to be read.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDeletedBlockWhenAddBlockIsInEdit() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
|
.numDataNodes(1).build();
|
||||||
|
DFSClient client = null;
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
assertEquals("Number of namenodes is not 2", 2,
|
||||||
|
cluster.getNumNameNodes());
|
||||||
|
// Transitioning the namenode 0 to active.
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
assertTrue("Namenode 0 should be in active state",
|
||||||
|
cluster.getNameNode(0).isActiveState());
|
||||||
|
assertTrue("Namenode 1 should be in standby state",
|
||||||
|
cluster.getNameNode(1).isStandbyState());
|
||||||
|
|
||||||
|
// Trigger heartbeat to mark DatanodeStorageInfo#heartbeatedSinceFailover
|
||||||
|
// to true.
|
||||||
|
DataNodeTestUtils.triggerHeartbeat(cluster.getDataNodes().get(0));
|
||||||
|
FileSystem fs = cluster.getFileSystem(0);
|
||||||
|
|
||||||
|
// Trigger blockReport to mark DatanodeStorageInfo#blockContentsStale
|
||||||
|
// to false.
|
||||||
|
cluster.getDataNodes().get(0).triggerBlockReport(
|
||||||
|
new BlockReportOptions.Factory().setIncremental(false).build());
|
||||||
|
|
||||||
|
Path fileName = new Path("/tmp.txt");
|
||||||
|
// create a file with one block
|
||||||
|
DFSTestUtil.createFile(fs, fileName, 10L, (short)1, 1234L);
|
||||||
|
DFSTestUtil.waitReplication(fs,fileName, (short)1);
|
||||||
|
|
||||||
|
client = new DFSClient(cluster.getFileSystem(0).getUri(), conf);
|
||||||
|
List<LocatedBlock> locatedBlocks = client.getNamenode().
|
||||||
|
getBlockLocations("/tmp.txt", 0, 10L).getLocatedBlocks();
|
||||||
|
assertTrue(locatedBlocks.size() == 1);
|
||||||
|
assertTrue(locatedBlocks.get(0).getLocations().length == 1);
|
||||||
|
|
||||||
|
// add a second datanode to the cluster
|
||||||
|
cluster.startDataNodes(conf, 1, true, null, null, null, null);
|
||||||
|
assertEquals("Number of datanodes should be 2", 2,
|
||||||
|
cluster.getDataNodes().size());
|
||||||
|
|
||||||
|
DataNode dn0 = cluster.getDataNodes().get(0);
|
||||||
|
DataNode dn1 = cluster.getDataNodes().get(1);
|
||||||
|
String activeNNBPId = cluster.getNamesystem(0).getBlockPoolId();
|
||||||
|
DatanodeDescriptor sourceDnDesc = NameNodeAdapter.getDatanode(
|
||||||
|
cluster.getNamesystem(0), dn0.getDNRegistrationForBP(activeNNBPId));
|
||||||
|
DatanodeDescriptor destDnDesc = NameNodeAdapter.getDatanode(
|
||||||
|
cluster.getNamesystem(0), dn1.getDNRegistrationForBP(activeNNBPId));
|
||||||
|
|
||||||
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
|
||||||
|
|
||||||
|
LOG.info("replaceBlock: " + replaceBlock(block,
|
||||||
|
(DatanodeInfo)sourceDnDesc, (DatanodeInfo)sourceDnDesc,
|
||||||
|
(DatanodeInfo)destDnDesc));
|
||||||
|
// Waiting for the FsDatasetAsyncDsikService to delete the block
|
||||||
|
Thread.sleep(3000);
|
||||||
|
// Triggering the incremental block report to report the deleted block to
|
||||||
|
// namnemode
|
||||||
|
cluster.getDataNodes().get(0).triggerBlockReport(
|
||||||
|
new BlockReportOptions.Factory().setIncremental(true).build());
|
||||||
|
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
|
||||||
|
assertTrue("Namenode 1 should be in active state",
|
||||||
|
cluster.getNameNode(1).isActiveState());
|
||||||
|
assertTrue("Namenode 0 should be in standby state",
|
||||||
|
cluster.getNameNode(0).isStandbyState());
|
||||||
|
client.close();
|
||||||
|
|
||||||
|
// Opening a new client for new active namenode
|
||||||
|
client = new DFSClient(cluster.getFileSystem(1).getUri(), conf);
|
||||||
|
List<LocatedBlock> locatedBlocks1 = client.getNamenode()
|
||||||
|
.getBlockLocations("/tmp.txt", 0, 10L).getLocatedBlocks();
|
||||||
|
|
||||||
|
assertEquals(1, locatedBlocks1.size());
|
||||||
|
assertEquals("The block should be only on 1 datanode ", 1,
|
||||||
|
locatedBlocks1.get(0).getLocations().length);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(null, client);
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param args
|
* @param args
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -151,10 +151,6 @@ public class TestDNFencing {
|
||||||
banner("NN2 Metadata immediately after failover");
|
banner("NN2 Metadata immediately after failover");
|
||||||
doMetasave(nn2);
|
doMetasave(nn2);
|
||||||
|
|
||||||
// Even though NN2 considers the blocks over-replicated, it should
|
|
||||||
// post-pone the block invalidation because the DNs are still "stale".
|
|
||||||
assertEquals(30, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
|
|
||||||
|
|
||||||
banner("Triggering heartbeats and block reports so that fencing is completed");
|
banner("Triggering heartbeats and block reports so that fencing is completed");
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
cluster.triggerBlockReports();
|
cluster.triggerBlockReports();
|
||||||
|
|
Loading…
Reference in New Issue