HDDS-1553. Add metrics in rack aware container placement policy. (#1361)

This commit is contained in:
Sammi Chen 2019-09-07 08:11:54 +08:00 committed by Xiaoyu Yao
parent 73575701ab
commit c46d43ab13
13 changed files with 368 additions and 30 deletions

View File

@ -191,7 +191,7 @@ public class ReplicationManager implements MetricsSource {
@VisibleForTesting
@SuppressFBWarnings(value="NN_NAKED_NOTIFY",
justification="Used only for testing")
synchronized void processContainersNow() {
public synchronized void processContainersNow() {
notify();
}

View File

@ -43,7 +43,8 @@ public final class ContainerPlacementPolicyFactory {
public static ContainerPlacementPolicy getPolicy(Configuration conf,
final NodeManager nodeManager, NetworkTopology clusterMap,
final boolean fallback) throws SCMException{
final boolean fallback, SCMContainerPlacementMetrics metrics)
throws SCMException{
final Class<? extends ContainerPlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
@ -51,7 +52,8 @@ public final class ContainerPlacementPolicyFactory {
Constructor<? extends ContainerPlacementPolicy> constructor;
try {
constructor = placementClass.getDeclaredConstructor(NodeManager.class,
Configuration.class, NetworkTopology.class, boolean.class);
Configuration.class, NetworkTopology.class, boolean.class,
SCMContainerPlacementMetrics.class);
LOG.info("Create container placement policy of type " +
placementClass.getCanonicalName());
} catch (NoSuchMethodException e) {
@ -64,7 +66,8 @@ public final class ContainerPlacementPolicyFactory {
}
try {
return constructor.newInstance(nodeManager, conf, clusterMap, fallback);
return constructor.newInstance(nodeManager, conf, clusterMap, fallback,
metrics);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate class " +
placementClass.getCanonicalName() + " for " + e.getMessage());

View File

@ -79,7 +79,7 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
*/
public SCMContainerPlacementCapacity(final NodeManager nodeManager,
final Configuration conf, final NetworkTopology networkTopology,
final boolean fallback) {
final boolean fallback, final SCMContainerPlacementMetrics metrics) {
super(nodeManager, conf);
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
* This class is for maintaining Topology aware container placement statistics.
*/
@Metrics(about="SCM Container Placement Metrics", context = "ozone")
public class SCMContainerPlacementMetrics implements MetricsSource {
public static final String SOURCE_NAME =
SCMContainerPlacementMetrics.class.getSimpleName();
private static final MetricsInfo RECORD_INFO = Interns.info(SOURCE_NAME,
"SCM Placement Metrics");
private static MetricsRegistry registry;
// total datanode allocation request count
@Metric private MutableCounterLong datanodeRequestCount;
// datanode allocation attempt count, including success, fallback and failed
@Metric private MutableCounterLong datanodeChooseAttemptCount;
// datanode successful allocation count
@Metric private MutableCounterLong datanodeChooseSuccessCount;
// datanode allocated with some allocation constrains compromised
@Metric private MutableCounterLong datanodeChooseFallbackCount;
public SCMContainerPlacementMetrics() {
}
public static SCMContainerPlacementMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
registry = new MetricsRegistry(RECORD_INFO);
return ms.register(SOURCE_NAME, "SCM Placement Metrics",
new SCMContainerPlacementMetrics());
}
public void incrDatanodeRequestCount(long count) {
System.out.println("request + 1");
this.datanodeRequestCount.incr(count);
}
public void incrDatanodeChooseSuccessCount() {
System.out.println("success + 1");
this.datanodeChooseSuccessCount.incr(1);
}
public void incrDatanodeChooseFallbackCount() {
System.out.println("fallback + 1");
this.datanodeChooseFallbackCount.incr(1);
}
public void incrDatanodeChooseAttemptCount() {
System.out.println("attempt + 1");
this.datanodeChooseAttemptCount.incr(1);
}
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}
@VisibleForTesting
public long getDatanodeRequestCount() {
return this.datanodeRequestCount.value();
}
@VisibleForTesting
public long getDatanodeChooseSuccessCount() {
return this.datanodeChooseSuccessCount.value();
}
@VisibleForTesting
public long getDatanodeChooseFallbackCount() {
return this.datanodeChooseFallbackCount.value();
}
@VisibleForTesting
public long getDatanodeChooseAttemptCount() {
return this.datanodeChooseAttemptCount.value();
}
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
registry.snapshot(collector.addRecord(registry.info().name()), true);
}
}

View File

@ -53,6 +53,7 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
private boolean fallback;
private static final int RACK_LEVEL = 1;
private static final int MAX_RETRY= 3;
private final SCMContainerPlacementMetrics metrics;
/**
* Constructs a Container Placement with rack awareness.
@ -66,10 +67,11 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
*/
public SCMContainerPlacementRackAware(final NodeManager nodeManager,
final Configuration conf, final NetworkTopology networkTopology,
final boolean fallback) {
final boolean fallback, final SCMContainerPlacementMetrics metrics) {
super(nodeManager, conf);
this.networkTopology = networkTopology;
this.fallback = fallback;
this.metrics = metrics;
}
/**
@ -93,7 +95,7 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
int nodesRequired, final long sizeRequired) throws SCMException {
Preconditions.checkArgument(nodesRequired > 0);
metrics.incrDatanodeRequestCount(nodesRequired);
int datanodeCount = networkTopology.getNumOfLeafNode(NetConstants.ROOT);
int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
if (datanodeCount < nodesRequired + excludedNodesCount) {
@ -241,9 +243,11 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
int ancestorGen = RACK_LEVEL;
int maxRetry = MAX_RETRY;
List<Node> excludedNodesForCapacity = null;
boolean isFallbacked = false;
while(true) {
Node node = networkTopology.chooseRandom(NetConstants.ROOT, null,
excludedNodes, affinityNode, ancestorGen);
metrics.incrDatanodeChooseAttemptCount();
if (node == null) {
// cannot find the node which meets all constrains
LOG.warn("Failed to find the datanode. excludedNodes:" +
@ -251,6 +255,7 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
", affinityNode:" +
(affinityNode == null ? "" : affinityNode.getNetworkFullPath()));
if (fallback) {
isFallbacked = true;
// fallback, don't consider the affinity node
if (affinityNode != null) {
affinityNode = null;
@ -267,11 +272,15 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
" excludedNodes and affinityNode constrains.", null);
}
if (hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
LOG.debug("Datanode {} is chosen. Required size is {}",
LOG.warn("Datanode {} is chosen. Required size is {}",
node.toString(), sizeRequired);
if (excludedNodes != null && excludedNodesForCapacity != null) {
excludedNodes.removeAll(excludedNodesForCapacity);
}
metrics.incrDatanodeChooseSuccessCount();
if (isFallbacked) {
metrics.incrDatanodeChooseFallbackCount();
}
return node;
} else {
maxRetry--;

View File

@ -51,7 +51,7 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
*/
public SCMContainerPlacementRandom(final NodeManager nodeManager,
final Configuration conf, final NetworkTopology networkTopology,
final boolean fallback) {
final boolean fallback, final SCMContainerPlacementMetrics metrics) {
super(nodeManager, conf);
}

View File

@ -42,6 +42,8 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.safemode.SafeModeHandler;
@ -387,9 +389,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
conf, scmStorageConfig, eventQueue, clusterMap);
}
SCMContainerPlacementMetrics placementMetrics =
SCMContainerPlacementMetrics.create();
ContainerPlacementPolicy containerPlacementPolicy =
ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager,
clusterMap, true);
clusterMap, true, placementMetrics);
if (configurator.getPipelineManager() != null) {
pipelineManager = configurator.getPipelineManager();

View File

@ -100,7 +100,8 @@ public class TestContainerPlacementFactory {
.thenReturn(new SCMNodeMetric(storageCapacity, 70L, 30L));
ContainerPlacementPolicy policy = ContainerPlacementPolicyFactory
.getPolicy(conf, nodeManager, cluster, true);
.getPolicy(conf, nodeManager, cluster, true,
SCMContainerPlacementMetrics.create());
int nodeNum = 3;
List<DatanodeDetails> datanodeDetails =
@ -117,7 +118,7 @@ public class TestContainerPlacementFactory {
@Test
public void testDefaultPolicy() throws IOException {
ContainerPlacementPolicy policy = ContainerPlacementPolicyFactory
.getPolicy(conf, null, null, true);
.getPolicy(conf, null, null, true, null);
Assert.assertSame(SCMContainerPlacementRandom.class, policy.getClass());
}
@ -138,7 +139,7 @@ public class TestContainerPlacementFactory {
// set a placement class which does't have the right constructor implemented
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
DummyImpl.class.getName());
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true);
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true, null);
}
@Test(expected = RuntimeException.class)
@ -146,6 +147,6 @@ public class TestContainerPlacementFactory {
// set a placement class not implemented
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
"org.apache.hadoop.hdds.scm.container.placement.algorithm.HelloWorld");
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true);
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true, null);
}
}

View File

@ -64,7 +64,8 @@ public class TestSCMContainerPlacementCapacity {
.thenReturn(new SCMNodeMetric(100L, 70L, 30L));
SCMContainerPlacementCapacity scmContainerPlacementRandom =
new SCMContainerPlacementCapacity(mockNodeManager, conf, null, true);
new SCMContainerPlacementCapacity(mockNodeManager, conf, null, true,
null);
List<DatanodeDetails> existingNodes = new ArrayList<>();
existingNodes.add(datanodes.get(0));

View File

@ -41,6 +41,8 @@ import java.util.List;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.when;
@ -58,6 +60,7 @@ public class TestSCMContainerPlacementRackAware {
private SCMContainerPlacementRackAware policyNoFallback;
// node storage capacity
private static final long STORAGE_CAPACITY = 100L;
private SCMContainerPlacementMetrics metrics;
@Before
public void setup() {
@ -93,10 +96,11 @@ public class TestSCMContainerPlacementRackAware {
.thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 70L, 30L));
// create placement policy instances
policy =
new SCMContainerPlacementRackAware(nodeManager, conf, cluster, true);
policyNoFallback =
new SCMContainerPlacementRackAware(nodeManager, conf, cluster, false);
metrics = SCMContainerPlacementMetrics.create();
policy = new SCMContainerPlacementRackAware(
nodeManager, conf, cluster, true, metrics);
policyNoFallback = new SCMContainerPlacementRackAware(
nodeManager, conf, cluster, false, metrics);
}
@ -181,7 +185,6 @@ public class TestSCMContainerPlacementRackAware {
@Test
public void testFallback() throws SCMException {
// 5 replicas. there are only 3 racks. policy with fallback should
// allocate the 5th datanode though it will break the rack rule(first
// 2 replicas on same rack, others on different racks).
@ -195,14 +198,45 @@ public class TestSCMContainerPlacementRackAware {
datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(3)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2),
datanodeDetails.get(3)));
// get metrics
long totalRequest = metrics.getDatanodeRequestCount();
long successCount = metrics.getDatanodeChooseSuccessCount();
long tryCount = metrics.getDatanodeChooseAttemptCount();
long compromiseCount = metrics.getDatanodeChooseFallbackCount();
// verify metrics
Assert.assertTrue(totalRequest == nodeNum);
Assert.assertTrue(successCount == nodeNum);
Assert.assertTrue(tryCount > nodeNum);
Assert.assertTrue(compromiseCount >= 1);
}
@Test(expected = SCMException.class)
@Test
public void testNoFallback() throws SCMException {
// 5 replicas. there are only 3 racks. policy prohibit fallback should fail.
int nodeNum = 5;
try {
policyNoFallback.chooseDatanodes(null, null, nodeNum, 15);
fail("Fallback prohibited, this call should fail");
} catch (Exception e) {
assertTrue(e.getClass().getSimpleName().equals("SCMException"));
}
// get metrics
long totalRequest = metrics.getDatanodeRequestCount();
long successCount = metrics.getDatanodeChooseSuccessCount();
long tryCount = metrics.getDatanodeChooseAttemptCount();
long compromiseCount = metrics.getDatanodeChooseFallbackCount();
Assert.assertTrue(totalRequest == nodeNum);
Assert.assertTrue(successCount >= 3);
Assert.assertTrue(tryCount >= nodeNum);
Assert.assertTrue(compromiseCount == 0);
}
@Test
@ -244,11 +278,28 @@ public class TestSCMContainerPlacementRackAware {
.equals(favoredNodes.get(0).getNetworkFullPath()));
}
@Test(expected = SCMException.class)
@Test
public void testNoInfiniteLoop() throws SCMException {
int nodeNum = 1;
try {
// request storage space larger than node capability
policy.chooseDatanodes(null, null, nodeNum, STORAGE_CAPACITY + 15);
fail("Storage requested exceeds capacity, this call should fail");
} catch (Exception e) {
assertTrue(e.getClass().getSimpleName().equals("SCMException"));
}
// get metrics
long totalRequest = metrics.getDatanodeRequestCount();
long successCount = metrics.getDatanodeChooseSuccessCount();
long tryCount = metrics.getDatanodeChooseAttemptCount();
long compromiseCount = metrics.getDatanodeChooseFallbackCount();
Assert.assertTrue(totalRequest == nodeNum);
Assert.assertTrue(successCount == 0);
Assert.assertTrue(tryCount > nodeNum);
Assert.assertTrue(compromiseCount == 0);
}
@Test
@ -270,7 +321,8 @@ public class TestSCMContainerPlacementRackAware {
// choose nodes to host 3 replica
int nodeNum = 3;
SCMContainerPlacementRackAware newPolicy =
new SCMContainerPlacementRackAware(nodeManager, conf, clusterMap, true);
new SCMContainerPlacementRackAware(nodeManager, conf, clusterMap, true,
metrics);
List<DatanodeDetails> datanodeDetails =
newPolicy.chooseDatanodes(null, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());

View File

@ -59,7 +59,8 @@ public class TestSCMContainerPlacementRandom {
.thenReturn(new SCMNodeMetric(100L, 90L, 10L));
SCMContainerPlacementRandom scmContainerPlacementRandom =
new SCMContainerPlacementRandom(mockNodeManager, conf, null, true);
new SCMContainerPlacementRandom(mockNodeManager, conf, null, true,
null);
List<DatanodeDetails> existingNodes = new ArrayList<>();
existingNodes.add(datanodes.get(0));

View File

@ -81,10 +81,10 @@ public class TestContainerPlacement {
SCMContainerPlacementCapacity capacityPlacer = new
SCMContainerPlacementCapacity(nodeManagerCapacity, new Configuration(),
null, true);
null, true, null);
SCMContainerPlacementRandom randomPlacer = new
SCMContainerPlacementRandom(nodeManagerRandom, new Configuration(),
null, true);
null, true, null);
for (int x = 0; x < opsCount; x++) {
long containerSize = random.nextInt(100) * OzoneConsts.GB;

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
/**
* Test cases to verify the metrics exposed by SCMPipelineManager.
*/
public class TestSCMContainerPlacementPolicyMetrics {
private MiniOzoneCluster cluster;
private MetricsRecordBuilder metrics;
private static OzoneClient ozClient = null;
private static ObjectStore store = null;
@Before
public void setup() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
"org.apache.hadoop.hdds.scm.container.placement.algorithms." +
"SCMContainerPlacementRackAware");
conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
StaticMapping.addNodeToRack(NetUtils.normalizeHostNames(
Collections.singleton(HddsUtils.getHostName(conf))).get(0),
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(4)
.build();
cluster.waitForClusterToBeReady();
metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName());
ozClient = OzoneClientFactory.getRpcClient(conf);
store = ozClient.getObjectStore();
}
/**
* Verifies container placement metric.
*/
@Test(timeout = 60000)
public void test() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String value = "sample value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
String keyName = UUID.randomUUID().toString();
// Write data into a key
try (OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.RATIS,
THREE, new HashMap<>())) {
out.write(value.getBytes());
}
// close container
PipelineManager manager =
cluster.getStorageContainerManager().getPipelineManager();
List<Pipeline> pipelines = manager.getPipelines().stream().filter(p ->
p.getType() == HddsProtos.ReplicationType.RATIS &&
p.getFactor() == HddsProtos.ReplicationFactor.THREE)
.collect(Collectors.toList());
Pipeline targetPipeline = pipelines.get(0);
List<DatanodeDetails> nodes = targetPipeline.getNodes();
manager.finalizeAndDestroyPipeline(pipelines.get(0), true);
// kill datanode to trigger under-replicated container replication
cluster.shutdownHddsDatanode(nodes.get(0));
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
}
cluster.getStorageContainerManager().getReplicationManager()
.processContainersNow();
try {
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
}
long totalRequest = getLongCounter("DatanodeRequestCount", metrics);
long tryCount = getLongCounter("DatanodeChooseAttemptCount", metrics);
long sucessCount =
getLongCounter("DatanodeChooseSuccessCount", metrics);
long compromiseCount =
getLongCounter("DatanodeChooseFallbackCount", metrics);
// Seems no under-replicated closed containers get replicated
Assert.assertTrue(totalRequest == 0);
Assert.assertTrue(tryCount == 0);
Assert.assertTrue(sucessCount == 0);
Assert.assertTrue(compromiseCount == 0);
}
@After
public void teardown() {
cluster.shutdown();
}
}