HDDS-303. Removing logic to identify containers to be closed from SCM. Contributed by Nanda kumar.

This commit is contained in:
Xiaoyu Yao 2018-09-05 12:26:23 -07:00
parent 9af96d4ed4
commit 8286bf2d1f
8 changed files with 16 additions and 553 deletions

View File

@ -64,4 +64,10 @@ public final class HddsConfigKeys {
public static final String HDDS_DB_PROFILE = "hdds.db.profile";
public static final DBProfile HDDS_DEFAULT_DB_PROFILE = DBProfile.SSD;
// Once a container usage crosses this threshold, it is eligible for
// closing.
public static final String HDDS_CONTAINER_CLOSE_THRESHOLD =
"hdds.container.close.threshold";
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
}

View File

@ -242,12 +242,6 @@ public final class ScmConfigKeys {
"ozone.scm.block.deletion.max.retry";
public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096;
// Once a container usage crosses this threshold, it is eligible for
// closing.
public static final String OZONE_SCM_CONTAINER_CLOSE_THRESHOLD =
"ozone.scm.container.close.threshold";
public static final float OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
public static final String HDDS_SCM_WATCHER_TIMEOUT =
"hdds.scm.watcher.timeout";

View File

@ -930,9 +930,9 @@
</description>
</property>
<property>
<name>ozone.scm.container.close.threshold</name>
<name>hdds.container.close.threshold</name>
<value>0.9f</value>
<tag>OZONE, SCM</tag>
<tag>OZONE, DATANODE</tag>
<description>
This determines the threshold to be used for closing a container.
When the container used percentage reaches this threshold,

View File

@ -21,11 +21,11 @@ package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@ -84,8 +84,8 @@ public class HddsDispatcher implements ContainerDispatcher {
containerType, conf, containerSet, volumeSet, metrics));
}
this.containerCloseThreshold = conf.getFloat(
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD,
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
}

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
@ -91,8 +90,6 @@ public class ContainerMapping implements Mapping {
private final PipelineSelector pipelineSelector;
private final ContainerStateManager containerStateManager;
private final LeaseManager<ContainerInfo> containerLeaseManager;
private final float containerCloseThreshold;
private final ContainerCloser closer;
private final EventPublisher eventPublisher;
private final long size;
@ -116,7 +113,6 @@ public class ContainerMapping implements Mapping {
cacheSizeMB, EventPublisher eventPublisher) throws IOException {
this.nodeManager = nodeManager;
this.cacheSize = cacheSizeMB;
this.closer = new ContainerCloser(nodeManager, conf);
File metaDir = getOzoneMetaDirPath(conf);
@ -140,9 +136,6 @@ public class ContainerMapping implements Mapping {
this.pipelineSelector = new PipelineSelector(nodeManager,
containerStateManager, conf, eventPublisher);
this.containerCloseThreshold = conf.getFloat(
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
this.eventPublisher = eventPublisher;
long containerCreationLeaseTimeout = conf.getTimeDuration(
@ -461,18 +454,18 @@ public class ContainerMapping implements Mapping {
/**
* Return a container matching the attributes specified.
*
* @param size - Space needed in the Container.
* @param sizeRequired - Space needed in the Container.
* @param owner - Owner of the container - A specific nameservice.
* @param type - Replication Type {StandAlone, Ratis}
* @param factor - Replication Factor {ONE, THREE}
* @param state - State of the Container-- {Open, Allocated etc.}
* @return ContainerInfo, null if there is no match found.
*/
public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) throws IOException {
public ContainerWithPipeline getMatchingContainerWithPipeline(
final long sizeRequired, String owner, ReplicationType type,
ReplicationFactor factor, LifeCycleState state) throws IOException {
ContainerInfo containerInfo = getStateManager()
.getMatchingContainer(size, owner, type, factor, state);
.getMatchingContainer(sizeRequired, owner, type, factor, state);
if (containerInfo == null) {
return null;
}
@ -563,20 +556,6 @@ public class ContainerMapping implements Mapping {
// the updated State.
containerStore.put(dbKey, newState.toByteArray());
// If the container is closed, then state is already written to SCM
Pipeline pipeline =
pipelineSelector.getPipeline(
PipelineID.getFromProtobuf(newState.getPipelineID()),
newState.getReplicationType());
if(pipeline == null) {
pipeline = pipelineSelector
.getReplicationPipeline(newState.getReplicationType(),
newState.getReplicationFactor());
}
// DB.TODO: So can we can write only once to DB.
if (closeContainerIfNeeded(newState, pipeline)) {
LOG.info("Closing the Container: {}", newState.getContainerID());
}
} else {
// Container not found in our container db.
LOG.error("Error while processing container report from datanode :" +
@ -637,52 +616,6 @@ public class ContainerMapping implements Mapping {
return builder.build();
}
/**
* Queues the close container command, to datanode and writes the new state
* to container DB.
* <p>
* TODO : Remove this 2 ContainerInfo definitions. It is brain dead to have
* one protobuf in one file and another definition in another file.
*
* @param newState - This is the state we maintain in SCM.
* @param pipeline
* @throws IOException
*/
private boolean closeContainerIfNeeded(SCMContainerInfo newState,
Pipeline pipeline)
throws IOException {
float containerUsedPercentage = 1.0f *
newState.getUsedBytes() / this.size;
ContainerInfo scmInfo = getContainer(newState.getContainerID());
if (containerUsedPercentage >= containerCloseThreshold
&& !isClosed(scmInfo)) {
// We will call closer till get to the closed state.
// That is SCM will make this call repeatedly until we reach the closed
// state.
closer.close(newState, pipeline);
if (shouldClose(scmInfo)) {
// This event moves the Container from Open to Closing State, this is
// a state inside SCM. This is the desired state that SCM wants this
// container to reach. We will know that a container has reached the
// closed state from container reports. This state change should be
// invoked once and only once.
HddsProtos.LifeCycleState state = updateContainerState(
scmInfo.getContainerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
if (state != HddsProtos.LifeCycleState.CLOSING) {
LOG.error("Failed to close container {}, reason : Not able " +
"to " +
"update container state, current container state: {}.",
newState.getContainerID(), state);
return false;
}
return true;
}
}
return false;
}
/**
* In Container is in closed state, if it is in closed, Deleting or Deleted
@ -699,11 +632,6 @@ public class ContainerMapping implements Mapping {
return info.getState() == HddsProtos.LifeCycleState.CLOSED;
}
@VisibleForTesting
public ContainerCloser getCloser() {
return closer;
}
/**
* Closes this stream and releases any system resources associated with it.
* If the stream is

View File

@ -1,194 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.hdds.scm.container.closer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT;
/**
* A class that manages closing of containers. This allows transition from a
* open but full container to a closed container, to which no data is written.
*/
public class ContainerCloser {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerCloser.class);
private static final long MULTIPLIER = 3L;
private static final int CLEANUP_WATER_MARK = 1000;
private final NodeManager nodeManager;
private final Map<Long, Long> commandIssued;
private final Configuration configuration;
private final AtomicInteger mapCount;
private final long reportInterval;
private final AtomicInteger threadRunCount;
private final AtomicBoolean isRunning;
/**
* Constructs the ContainerCloser class.
*
* @param nodeManager - NodeManager
* @param conf - Configuration
*/
public ContainerCloser(NodeManager nodeManager, Configuration conf) {
Preconditions.checkNotNull(nodeManager);
Preconditions.checkNotNull(conf);
this.nodeManager = nodeManager;
this.configuration = conf;
this.commandIssued = new ConcurrentHashMap<>();
this.mapCount = new AtomicInteger(0);
this.threadRunCount = new AtomicInteger(0);
this.isRunning = new AtomicBoolean(false);
this.reportInterval = this.configuration.getTimeDuration(
HDDS_CONTAINER_REPORT_INTERVAL,
HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
Preconditions.checkState(this.reportInterval > 0,
"report interval has to be greater than 0");
}
@VisibleForTesting
public static int getCleanupWaterMark() {
return CLEANUP_WATER_MARK;
}
/**
* Sends a Container Close command to the data nodes where this container
* lives.
*
* @param info - ContainerInfo.
* @param pipeline
*/
public void close(SCMContainerInfo info,
Pipeline pipeline) {
if (commandIssued.containsKey(info.getContainerID())) {
// We check if we issued a close command in last 3 * reportInterval secs.
long commandQueueTime = commandIssued.get(info.getContainerID());
long currentTime = Time.monotonicNow();
if (currentTime > commandQueueTime + (MULTIPLIER * reportInterval)) {
commandIssued.remove(info.getContainerID());
mapCount.decrementAndGet();
} else {
// Ignore this request, since we just issued a close command. We
// should wait instead of sending a command to datanode again.
return;
}
}
// if we reached here, it means that we have not issued a command to the
// data node in last (3 times report interval). We are presuming that is
// enough time to close the container. Let us go ahead and queue a close
// to all the datanodes that participate in the container.
//
// Three important things to note here:
//
// 1. It is ok to send this command multiple times to a datanode. Close
// container is an idempotent command, if the container is already closed
// then we have no issues.
//
// 2. The container close command is issued to all datanodes. But
// depending on the pipeline type, some of the datanodes might ignore it.
//
// 3. SCM will see that datanode is closed from container reports, but it
// is possible that datanodes might get close commands since
// this queue can be emptied by a datanode after a close report is send
// to SCM. In that case also, data node will ignore this command.
for (DatanodeDetails datanodeDetails : pipeline.getMachines()) {
nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(info.getContainerID(),
info.getReplicationType(),
PipelineID.getFromProtobuf(info.getPipelineID())));
}
if (!commandIssued.containsKey(info.getContainerID())) {
commandIssued.put(info.getContainerID(), Time.monotonicNow());
mapCount.incrementAndGet();
}
// run the hash map cleaner thread if needed, non-blocking call.
runCleanerThreadIfNeeded();
}
private void runCleanerThreadIfNeeded() {
// Let us check if we should run a cleaner thread, not using map.size
// since it runs a loop in the case of the concurrentMap.
if (mapCount.get() > CLEANUP_WATER_MARK &&
isRunning.compareAndSet(false, true)) {
Runnable entryCleaner = () -> {
LOG.debug("Starting close container Hash map cleaner.");
try {
for (Map.Entry<Long, Long> entry : commandIssued.entrySet()) {
long commandQueueTime = entry.getValue();
if (commandQueueTime + (MULTIPLIER * reportInterval) >
Time.monotonicNow()) {
// It is possible for this remove to fail due to race conditions.
// No big deal we will cleanup next time.
commandIssued.remove(entry.getKey());
mapCount.decrementAndGet();
}
}
isRunning.compareAndSet(true, false);
LOG.debug("Finished running, close container Hash map cleaner.");
} catch (Exception ex) {
LOG.error("Unable to finish cleaning the closed containers map.", ex);
}
};
// Launch the cleaner thread when we need instead of having a daemon
// thread that is sleeping all the time. We need to set the Daemon to
// true to avoid blocking clean exits.
Thread cleanerThread = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Closed Container Cleaner Thread - %d")
.build().newThread(entryCleaner);
threadRunCount.incrementAndGet();
cleanerThread.start();
}
}
@VisibleForTesting
public int getThreadRunCount() {
return threadRunCount.get();
}
@VisibleForTesting
public int getCloseCount() {
return mapCount.get();
}
}

View File

@ -288,49 +288,6 @@ public class TestContainerMapping {
}
}
@Test
public void testContainerCloseWithContainerReport() throws IOException {
ContainerInfo info = createContainer();
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
new ArrayList<>();
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
ciBuilder.setFinalhash("7c45eb4d7ed5e0d2e89aaab7759de02e")
.setSize(5368709120L)
.setUsed(5368705120L)
.setKeyCount(500000000L)
.setReadCount(500000000L)
.setWriteCount(500000000L)
.setReadBytes(5368705120L)
.setWriteBytes(5368705120L)
.setContainerID(info.getContainerID())
.setDeleteTransactionId(0);
reports.add(ciBuilder.build());
ContainerReportsProto.Builder crBuilder =
ContainerReportsProto.newBuilder();
crBuilder.addAllReports(reports);
mapping.processContainerReports(datanodeDetails, crBuilder.build(), false);
ContainerInfo updatedContainer =
mapping.getContainer(info.getContainerID());
Assert.assertEquals(500000000L,
updatedContainer.getNumberOfKeys());
Assert.assertEquals(5368705120L, updatedContainer.getUsedBytes());
NavigableSet<ContainerID> pendingCloseContainers = mapping.getStateManager()
.getMatchingContainerIDs(
containerOwner,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.CLOSING);
Assert.assertTrue(
pendingCloseContainers.contains(updatedContainer.containerID()));
}
@Test
public void testCloseContainer() throws IOException {
ContainerInfo info = createContainer();

View File

@ -1,228 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.hdds.scm.container.closer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.TestContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent
.CREATE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent
.CREATED;
/**
* Test class for Closing Container.
*/
public class TestContainerCloser {
private static final long GIGABYTE = 1024L * 1024L * 1024L;
private static Configuration configuration;
private static MockNodeManager nodeManager;
private static ContainerMapping mapping;
private static long size;
private static File testDir;
@BeforeClass
public static void setUp() throws Exception {
configuration = SCMTestUtils.getConf();
size = (long)configuration.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
configuration.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL,
1, TimeUnit.SECONDS);
testDir = GenericTestUtils
.getTestDir(TestContainerMapping.class.getSimpleName());
configuration.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
testDir.getAbsolutePath());
nodeManager = new MockNodeManager(true, 10);
mapping = new ContainerMapping(configuration, nodeManager, 128,
new EventQueue());
}
@AfterClass
public static void tearDown() throws Exception {
if (mapping != null) {
mapping.close();
}
FileUtil.fullyDelete(testDir);
}
@Test
public void testClose() throws IOException {
ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
ContainerInfo info = containerWithPipeline.getContainerInfo();
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(info.getContainerID(), CREATE);
mapping.updateContainerState(info.getContainerID(), CREATED);
long currentCount = mapping.getCloser().getCloseCount();
long runCount = mapping.getCloser().getThreadRunCount();
DatanodeDetails datanode = containerWithPipeline.getPipeline().getLeader();
// Send a container report with used set to 1 GB. This should not close.
sendContainerReport(info, 1 * GIGABYTE);
// with only one container the cleaner thread should not run.
Assert.assertEquals(0, mapping.getCloser().getThreadRunCount());
// With only 1 GB, the container should not be queued for closing.
Assert.assertEquals(0, mapping.getCloser().getCloseCount());
// Assert that the Close command was not queued for this Datanode.
Assert.assertEquals(0, nodeManager.getCommandCount(datanode));
long newUsed = (long) (size * 0.91f);
sendContainerReport(info, newUsed);
// with only one container the cleaner thread should not run.
Assert.assertEquals(runCount, mapping.getCloser().getThreadRunCount());
// and close count will be one.
Assert.assertEquals(1,
mapping.getCloser().getCloseCount() - currentCount);
// Assert that the Close command was Queued for this Datanode.
Assert.assertEquals(1, nodeManager.getCommandCount(datanode));
}
@Test
public void testRepeatedClose() throws IOException,
InterruptedException {
// This test asserts that if we queue more than one report then the
// second report is discarded by the system if it lands in the 3 * report
// frequency window.
configuration.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1,
TimeUnit.SECONDS);
ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
ContainerInfo info = containerWithPipeline.getContainerInfo();
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(info.getContainerID(), CREATE);
long currentCount = mapping.getCloser().getCloseCount();
long runCount = mapping.getCloser().getThreadRunCount();
DatanodeDetails datanodeDetails = containerWithPipeline.getPipeline()
.getLeader();
// Send this command twice and assert we have only one command in queue.
sendContainerReport(info, 5 * GIGABYTE);
sendContainerReport(info, 5 * GIGABYTE);
// Assert that the Close command was Queued for this Datanode.
Assert.assertEquals(1,
nodeManager.getCommandCount(datanodeDetails));
// And close count will be one.
Assert.assertEquals(1,
mapping.getCloser().getCloseCount() - currentCount);
Thread.sleep(TimeUnit.SECONDS.toMillis(4));
//send another close and the system will queue this to the command queue.
sendContainerReport(info, 5 * GIGABYTE);
Assert.assertEquals(2,
nodeManager.getCommandCount(datanodeDetails));
// but the close count will still be one, since from the point of view of
// closer we are closing only one container even if we have send multiple
// close commands to the datanode.
Assert.assertEquals(1, mapping.getCloser().getCloseCount()
- currentCount);
}
@Test
public void testCleanupThreadRuns() throws IOException,
InterruptedException {
// This test asserts that clean up thread runs once we have closed a
// number above cleanup water mark.
long runCount = mapping.getCloser().getThreadRunCount();
for (int x = 0; x < ContainerCloser.getCleanupWaterMark() + 10; x++) {
ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
ContainerInfo info = containerWithPipeline.getContainerInfo();
mapping.updateContainerState(info.getContainerID(), CREATE);
mapping.updateContainerState(info.getContainerID(), CREATED);
sendContainerReport(info, 5 * GIGABYTE);
}
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
// Assert that cleanup thread ran at least once.
Assert.assertTrue(mapping.getCloser().getThreadRunCount() - runCount > 0);
}
private void sendContainerReport(ContainerInfo info, long used) throws
IOException {
ContainerReportsProto.Builder
reports = ContainerReportsProto.newBuilder();
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
ciBuilder.setContainerID(info.getContainerID())
.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
.setSize(size)
.setUsed(used)
.setKeyCount(100000000L)
.setReadCount(100000000L)
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
.setDeleteTransactionId(0);
reports.addReports(ciBuilder);
mapping.processContainerReports(TestUtils.randomDatanodeDetails(),
reports.build(), false);
}
}