HDFS-16064. Determine when to invalidate corrupt replicas based on number of usable replicas (#4410)
Co-authored-by: Kevin Wikant <wikak@amazon.com> Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
cb0421095b
commit
cfceaebde6
|
@ -1916,23 +1916,29 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
b.getReasonCode(), b.getStored().isStriped());
|
b.getReasonCode(), b.getStored().isStriped());
|
||||||
|
|
||||||
NumberReplicas numberOfReplicas = countNodes(b.getStored());
|
NumberReplicas numberOfReplicas = countNodes(b.getStored());
|
||||||
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
|
final int numUsableReplicas = numberOfReplicas.liveReplicas() +
|
||||||
|
numberOfReplicas.decommissioning() +
|
||||||
|
numberOfReplicas.liveEnteringMaintenanceReplicas();
|
||||||
|
boolean hasEnoughLiveReplicas = numUsableReplicas >=
|
||||||
expectedRedundancies;
|
expectedRedundancies;
|
||||||
|
|
||||||
boolean minReplicationSatisfied = hasMinStorage(b.getStored(),
|
boolean minReplicationSatisfied = hasMinStorage(b.getStored(),
|
||||||
numberOfReplicas.liveReplicas());
|
numUsableReplicas);
|
||||||
|
|
||||||
boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
|
boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
|
||||||
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
|
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
|
||||||
expectedRedundancies;
|
expectedRedundancies;
|
||||||
boolean corruptedDuringWrite = minReplicationSatisfied &&
|
boolean corruptedDuringWrite = minReplicationSatisfied &&
|
||||||
b.isCorruptedDuringWrite();
|
b.isCorruptedDuringWrite();
|
||||||
// case 1: have enough number of live replicas
|
// case 1: have enough number of usable replicas
|
||||||
// case 2: corrupted replicas + live replicas > Replication factor
|
// case 2: corrupted replicas + usable replicas > Replication factor
|
||||||
// case 3: Block is marked corrupt due to failure while writing. In this
|
// case 3: Block is marked corrupt due to failure while writing. In this
|
||||||
// case genstamp will be different than that of valid block.
|
// case genstamp will be different than that of valid block.
|
||||||
// In all these cases we can delete the replica.
|
// In all these cases we can delete the replica.
|
||||||
// In case of 3, rbw block will be deleted and valid block can be replicated
|
// In case 3, rbw block will be deleted and valid block can be replicated.
|
||||||
|
// Note NN only becomes aware of corrupt blocks when the block report is sent,
|
||||||
|
// this means that by default it can take up to 6 hours for a corrupt block to
|
||||||
|
// be invalidated, after which the valid block can be replicated.
|
||||||
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|
||||||
|| corruptedDuringWrite) {
|
|| corruptedDuringWrite) {
|
||||||
if (b.getStored().isStriped()) {
|
if (b.getStored().isStriped()) {
|
||||||
|
@ -3656,7 +3662,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
". blockMap has {} but corrupt replicas map has {}",
|
". blockMap has {} but corrupt replicas map has {}",
|
||||||
storedBlock, numCorruptNodes, corruptReplicasCount);
|
storedBlock, numCorruptNodes, corruptReplicasCount);
|
||||||
}
|
}
|
||||||
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileRedundancy)) {
|
if ((corruptReplicasCount > 0) && (numUsableReplicas >= fileRedundancy)) {
|
||||||
invalidateCorruptReplicas(storedBlock, reportedBlock, num);
|
invalidateCorruptReplicas(storedBlock, reportedBlock, num);
|
||||||
}
|
}
|
||||||
return storedBlock;
|
return storedBlock;
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -1902,4 +1903,285 @@ public class TestDecommission extends AdminStatesBaseTest {
|
||||||
!BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node)
|
!BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node)
|
||||||
&& !node.isAlive()), 500, 20000);
|
&& !node.isAlive()), 500, 20000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
This test reproduces a scenario where an under-replicated block on a decommissioning node
|
||||||
|
cannot be replicated to some datanodes because they have a corrupt replica of the block.
|
||||||
|
The test ensures that the corrupt replicas are eventually invalidated so that the
|
||||||
|
under-replicated block can be replicated to sufficient datanodes & the decommissioning
|
||||||
|
node can be decommissioned.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testDeleteCorruptReplicaForUnderReplicatedBlock() throws Exception {
|
||||||
|
// Constants
|
||||||
|
final Path file = new Path("/test-file");
|
||||||
|
final int numDatanode = 3;
|
||||||
|
final short replicationFactor = 2;
|
||||||
|
final int numStoppedNodes = 2;
|
||||||
|
final int numDecommNodes = 1;
|
||||||
|
assertEquals(numDatanode, numStoppedNodes + numDecommNodes);
|
||||||
|
|
||||||
|
// Run monitor every 5 seconds to speed up decommissioning & make the test faster
|
||||||
|
final int datanodeAdminMonitorFixedRateSeconds = 5;
|
||||||
|
getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
|
||||||
|
datanodeAdminMonitorFixedRateSeconds);
|
||||||
|
// Set block report interval to 6 hours to avoid unexpected block reports.
|
||||||
|
// The default block report interval is different for a MiniDFSCluster
|
||||||
|
getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||||
|
DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
|
||||||
|
// Run the BlockManager RedundancyMonitor every 3 seconds such that the Namenode
|
||||||
|
// sends under-replication blocks for replication frequently
|
||||||
|
getConf().setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT);
|
||||||
|
// Ensure that the DataStreamer client will replace the bad datanode on append failure
|
||||||
|
getConf().set(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, "ALWAYS");
|
||||||
|
// Avoid having the DataStreamer client fail the append operation if datanode replacement fails
|
||||||
|
getConf()
|
||||||
|
.setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
|
||||||
|
|
||||||
|
// References to datanodes in the cluster
|
||||||
|
// - 2 datanode will be stopped to generate corrupt block replicas & then
|
||||||
|
// restarted later to validate the corrupt replicas are invalidated
|
||||||
|
// - 1 datanode will start decommissioning to make the block under replicated
|
||||||
|
final List<DatanodeDescriptor> allNodes = new ArrayList<>();
|
||||||
|
final List<DatanodeDescriptor> stoppedNodes = new ArrayList<>();
|
||||||
|
final DatanodeDescriptor decommNode;
|
||||||
|
|
||||||
|
// Create MiniDFSCluster
|
||||||
|
startCluster(1, numDatanode);
|
||||||
|
getCluster().waitActive();
|
||||||
|
final FSNamesystem namesystem = getCluster().getNamesystem();
|
||||||
|
final BlockManager blockManager = namesystem.getBlockManager();
|
||||||
|
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
|
||||||
|
final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager();
|
||||||
|
final FileSystem fs = getCluster().getFileSystem();
|
||||||
|
|
||||||
|
// Get DatanodeDescriptors
|
||||||
|
for (final DataNode node : getCluster().getDataNodes()) {
|
||||||
|
allNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create block with 2 FINALIZED replicas
|
||||||
|
// Note that:
|
||||||
|
// - calling hflush leaves block in state ReplicaBeingWritten
|
||||||
|
// - calling close leaves the block in state FINALIZED
|
||||||
|
// - amount of data is kept small because flush is not synchronous
|
||||||
|
LOG.info("Creating Initial Block with {} FINALIZED replicas", replicationFactor);
|
||||||
|
FSDataOutputStream out = fs.create(file, replicationFactor);
|
||||||
|
for (int i = 0; i < 512; i++) {
|
||||||
|
out.write(i);
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// Validate the block exists with expected number of replicas
|
||||||
|
assertEquals(1, blockManager.getTotalBlocks());
|
||||||
|
BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0);
|
||||||
|
assertEquals(1, blocksInFile.length);
|
||||||
|
List<String> replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
|
||||||
|
assertEquals(replicationFactor, replicasInBlock.size());
|
||||||
|
|
||||||
|
// Identify the DatanodeDescriptors associated with the 2 nodes with replicas.
|
||||||
|
// Each of nodes with a replica will be stopped later to corrupt the replica
|
||||||
|
DatanodeDescriptor decommNodeTmp = null;
|
||||||
|
for (DatanodeDescriptor node : allNodes) {
|
||||||
|
if (replicasInBlock.contains(node.getName())) {
|
||||||
|
stoppedNodes.add(node);
|
||||||
|
} else {
|
||||||
|
decommNodeTmp = node;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(numStoppedNodes, stoppedNodes.size());
|
||||||
|
assertNotNull(decommNodeTmp);
|
||||||
|
decommNode = decommNodeTmp;
|
||||||
|
final DatanodeDescriptor firstStoppedNode = stoppedNodes.get(0);
|
||||||
|
final DatanodeDescriptor secondStoppedNode = stoppedNodes.get(1);
|
||||||
|
LOG.info("Detected 2 nodes with replicas : {} , {}", firstStoppedNode.getXferAddr(),
|
||||||
|
secondStoppedNode.getXferAddr());
|
||||||
|
LOG.info("Detected 1 node without replica : {}", decommNode.getXferAddr());
|
||||||
|
|
||||||
|
// Stop firstStoppedNode & the append to the block pipeline such that DataStreamer client:
|
||||||
|
// - detects firstStoppedNode as bad link in block pipeline
|
||||||
|
// - replaces the firstStoppedNode with decommNode in block pipeline
|
||||||
|
// The result is that:
|
||||||
|
// - secondStoppedNode & decommNode have a live block replica
|
||||||
|
// - firstStoppedNode has a corrupt replica (corrupt because of old GenStamp)
|
||||||
|
LOG.info("Stopping first node with replica {}", firstStoppedNode.getXferAddr());
|
||||||
|
final List<MiniDFSCluster.DataNodeProperties> stoppedNodeProps = new ArrayList<>();
|
||||||
|
MiniDFSCluster.DataNodeProperties stoppedNodeProp =
|
||||||
|
getCluster().stopDataNode(firstStoppedNode.getXferAddr());
|
||||||
|
stoppedNodeProps.add(stoppedNodeProp);
|
||||||
|
firstStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past
|
||||||
|
// Wait for NN to detect the datanode as dead
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> 2 == datanodeManager.getNumLiveDataNodes() && 1 == datanodeManager
|
||||||
|
.getNumDeadDataNodes(), 500, 30000);
|
||||||
|
// Append to block pipeline
|
||||||
|
appendBlock(fs, file, 2);
|
||||||
|
|
||||||
|
// Stop secondStoppedNode & the append to the block pipeline such that DataStreamer client:
|
||||||
|
// - detects secondStoppedNode as bad link in block pipeline
|
||||||
|
// - attempts to replace secondStoppedNode but cannot because there are no more live nodes
|
||||||
|
// - appends to the block pipeline containing just decommNode
|
||||||
|
// The result is that:
|
||||||
|
// - decommNode has a live block replica
|
||||||
|
// - firstStoppedNode & secondStoppedNode both have a corrupt replica
|
||||||
|
LOG.info("Stopping second node with replica {}", secondStoppedNode.getXferAddr());
|
||||||
|
stoppedNodeProp = getCluster().stopDataNode(secondStoppedNode.getXferAddr());
|
||||||
|
stoppedNodeProps.add(stoppedNodeProp);
|
||||||
|
secondStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past
|
||||||
|
// Wait for NN to detect the datanode as dead
|
||||||
|
GenericTestUtils.waitFor(() -> numDecommNodes == datanodeManager.getNumLiveDataNodes()
|
||||||
|
&& numStoppedNodes == datanodeManager.getNumDeadDataNodes(), 500, 30000);
|
||||||
|
// Append to block pipeline
|
||||||
|
appendBlock(fs, file, 1);
|
||||||
|
|
||||||
|
// Validate block replica locations
|
||||||
|
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
|
||||||
|
assertEquals(1, blocksInFile.length);
|
||||||
|
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
|
||||||
|
assertEquals(numDecommNodes, replicasInBlock.size());
|
||||||
|
assertTrue(replicasInBlock.contains(decommNode.getName()));
|
||||||
|
LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 live replica on {}",
|
||||||
|
firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr());
|
||||||
|
|
||||||
|
LOG.info("Decommission node {} with the live replica", decommNode.getXferAddr());
|
||||||
|
final ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
|
||||||
|
takeNodeOutofService(0, decommNode.getDatanodeUuid(), 0, decommissionedNodes,
|
||||||
|
AdminStates.DECOMMISSION_INPROGRESS);
|
||||||
|
|
||||||
|
// Wait for the datanode to start decommissioning
|
||||||
|
try {
|
||||||
|
GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0
|
||||||
|
&& decomManager.getNumPendingNodes() == numDecommNodes && decommNode.getAdminState()
|
||||||
|
.equals(AdminStates.DECOMMISSION_INPROGRESS), 500, 30000);
|
||||||
|
} catch (Exception e) {
|
||||||
|
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
|
||||||
|
assertEquals(1, blocksInFile.length);
|
||||||
|
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
|
||||||
|
String errMsg = String.format("Node %s failed to start decommissioning."
|
||||||
|
+ " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]",
|
||||||
|
decommNode.getXferAddr(), decomManager.getNumTrackedNodes(),
|
||||||
|
decomManager.getNumPendingNodes(), decommNode.getAdminState(),
|
||||||
|
String.join(", ", replicasInBlock));
|
||||||
|
LOG.error(errMsg); // Do not log generic timeout exception
|
||||||
|
fail(errMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate block replica locations
|
||||||
|
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
|
||||||
|
assertEquals(1, blocksInFile.length);
|
||||||
|
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
|
||||||
|
assertEquals(numDecommNodes, replicasInBlock.size());
|
||||||
|
assertEquals(replicasInBlock.get(0), decommNode.getName());
|
||||||
|
LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 decommissioning replica on {}",
|
||||||
|
firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr());
|
||||||
|
|
||||||
|
// Restart the 2 stopped datanodes
|
||||||
|
LOG.info("Restarting stopped nodes {} , {}", firstStoppedNode.getXferAddr(),
|
||||||
|
secondStoppedNode.getXferAddr());
|
||||||
|
for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) {
|
||||||
|
assertTrue(getCluster().restartDataNode(stoppedNode));
|
||||||
|
}
|
||||||
|
for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) {
|
||||||
|
try {
|
||||||
|
getCluster().waitDatanodeFullyStarted(stoppedNode.getDatanode(), 30000);
|
||||||
|
LOG.info("Node {} Restarted", stoppedNode.getDatanode().getXferAddress());
|
||||||
|
} catch (Exception e) {
|
||||||
|
String errMsg = String.format("Node %s Failed to Restart within 30 seconds",
|
||||||
|
stoppedNode.getDatanode().getXferAddress());
|
||||||
|
LOG.error(errMsg); // Do not log generic timeout exception
|
||||||
|
fail(errMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trigger block reports for the 2 restarted nodes to ensure their corrupt
|
||||||
|
// block replicas are identified by the namenode
|
||||||
|
for (MiniDFSCluster.DataNodeProperties dnProps : stoppedNodeProps) {
|
||||||
|
DataNodeTestUtils.triggerBlockReport(dnProps.getDatanode());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the datanode is eventually decommissioned
|
||||||
|
// Some changes are needed to ensure replication/decommissioning occur in a timely manner:
|
||||||
|
// - if the namenode sends a DNA_TRANSFER before sending the DNA_INVALIDATE's then:
|
||||||
|
// - the block will enter the pendingReconstruction queue
|
||||||
|
// - this prevent the block from being sent for transfer again for some time
|
||||||
|
// - solution is to call "clearQueues" so that DNA_TRANSFER is sent again after DNA_INVALIDATE
|
||||||
|
// - need to run the check less frequently than DatanodeAdminMonitor
|
||||||
|
// such that in between "clearQueues" calls 2 things can occur:
|
||||||
|
// - DatanodeAdminMonitor runs which sets the block as neededReplication
|
||||||
|
// - datanode heartbeat is received which sends the DNA_TRANSFER to the node
|
||||||
|
final int checkEveryMillis = datanodeAdminMonitorFixedRateSeconds * 2 * 1000;
|
||||||
|
try {
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
blockManager.clearQueues(); // Clear pendingReconstruction queue
|
||||||
|
return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0
|
||||||
|
&& decommNode.getAdminState().equals(AdminStates.DECOMMISSIONED);
|
||||||
|
}, checkEveryMillis, 40000);
|
||||||
|
} catch (Exception e) {
|
||||||
|
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
|
||||||
|
assertEquals(1, blocksInFile.length);
|
||||||
|
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
|
||||||
|
String errMsg = String.format("Node %s failed to complete decommissioning."
|
||||||
|
+ " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]",
|
||||||
|
decommNode.getXferAddr(), decomManager.getNumTrackedNodes(),
|
||||||
|
decomManager.getNumPendingNodes(), decommNode.getAdminState(),
|
||||||
|
String.join(", ", replicasInBlock));
|
||||||
|
LOG.error(errMsg); // Do not log generic timeout exception
|
||||||
|
fail(errMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate block replica locations.
|
||||||
|
// Note that in order for decommissioning to complete the block must be
|
||||||
|
// replicated to both of the restarted datanodes; this implies that the
|
||||||
|
// corrupt replicas were invalidated on both of the restarted datanodes.
|
||||||
|
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
|
||||||
|
assertEquals(1, blocksInFile.length);
|
||||||
|
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
|
||||||
|
assertEquals(numDatanode, replicasInBlock.size());
|
||||||
|
assertTrue(replicasInBlock.contains(decommNode.getName()));
|
||||||
|
for (final DatanodeDescriptor node : stoppedNodes) {
|
||||||
|
assertTrue(replicasInBlock.contains(node.getName()));
|
||||||
|
}
|
||||||
|
LOG.info("Block now has 2 live replicas on [{} , {}] and 1 decommissioned replica on {}",
|
||||||
|
firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr());
|
||||||
|
}
|
||||||
|
|
||||||
|
void appendBlock(final FileSystem fs, final Path file, int expectedReplicas) throws IOException {
|
||||||
|
LOG.info("Appending to the block pipeline");
|
||||||
|
boolean failed = false;
|
||||||
|
Exception failedReason = null;
|
||||||
|
try {
|
||||||
|
FSDataOutputStream out = fs.append(file);
|
||||||
|
for (int i = 0; i < 512; i++) {
|
||||||
|
out.write(i);
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
failed = true;
|
||||||
|
failedReason = e;
|
||||||
|
} finally {
|
||||||
|
BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0);
|
||||||
|
assertEquals(1, blocksInFile.length);
|
||||||
|
List<String> replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
|
||||||
|
if (failed) {
|
||||||
|
String errMsg = String.format(
|
||||||
|
"Unexpected exception appending to the block pipeline."
|
||||||
|
+ " nodesWithReplica=[%s]", String.join(", ", replicasInBlock));
|
||||||
|
LOG.error(errMsg, failedReason); // Do not swallow the exception
|
||||||
|
fail(errMsg);
|
||||||
|
} else if (expectedReplicas != replicasInBlock.size()) {
|
||||||
|
String errMsg = String.format("Expecting %d replicas in block pipeline,"
|
||||||
|
+ " unexpectedly found %d replicas. nodesWithReplica=[%s]", expectedReplicas,
|
||||||
|
replicasInBlock.size(), String.join(", ", replicasInBlock));
|
||||||
|
LOG.error(errMsg);
|
||||||
|
fail(errMsg);
|
||||||
|
} else {
|
||||||
|
String infoMsg = String.format(
|
||||||
|
"Successfully appended block pipeline with %d replicas."
|
||||||
|
+ " nodesWithReplica=[%s]",
|
||||||
|
replicasInBlock.size(), String.join(", ", replicasInBlock));
|
||||||
|
LOG.info(infoMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue