From 8286bf2d1fe7a9051ee93ca6e4dd13e5348d00b8 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Wed, 5 Sep 2018 12:26:23 -0700 Subject: [PATCH] HDDS-303. Removing logic to identify containers to be closed from SCM. Contributed by Nanda kumar. --- .../apache/hadoop/hdds/HddsConfigKeys.java | 6 + .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 6 - .../src/main/resources/ozone-default.xml | 4 +- .../container/common/impl/HddsDispatcher.java | 6 +- .../hdds/scm/container/ContainerMapping.java | 82 +------ .../scm/container/closer/ContainerCloser.java | 194 --------------- .../scm/container/TestContainerMapping.java | 43 ---- .../container/closer/TestContainerCloser.java | 228 ------------------ 8 files changed, 16 insertions(+), 553 deletions(-) delete mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java delete mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 8272ed7bb22..3bda29fcabe 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -64,4 +64,10 @@ public final class HddsConfigKeys { public static final String HDDS_DB_PROFILE = "hdds.db.profile"; public static final DBProfile HDDS_DEFAULT_DB_PROFILE = DBProfile.SSD; + // Once a container usage crosses this threshold, it is eligible for + // closing. + public static final String HDDS_CONTAINER_CLOSE_THRESHOLD = + "hdds.container.close.threshold"; + public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; + } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 4c9a3bf7fe4..62d9ef5df59 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -242,12 +242,6 @@ public final class ScmConfigKeys { "ozone.scm.block.deletion.max.retry"; public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096; - // Once a container usage crosses this threshold, it is eligible for - // closing. - public static final String OZONE_SCM_CONTAINER_CLOSE_THRESHOLD = - "ozone.scm.container.close.threshold"; - public static final float OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; - public static final String HDDS_SCM_WATCHER_TIMEOUT = "hdds.scm.watcher.timeout"; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index ca3da417472..2112ae33074 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -930,9 +930,9 @@ - ozone.scm.container.close.threshold + hdds.container.close.threshold 0.9f - OZONE, SCM + OZONE, DATANODE This determines the threshold to be used for closing a container. When the container used percentage reaches this threshold, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index ebc26280314..bb5002ae69e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -21,11 +21,11 @@ package org.apache.hadoop.ozone.container.common.impl; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerAction; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; @@ -84,8 +84,8 @@ public class HddsDispatcher implements ContainerDispatcher { containerType, conf, containerSet, volumeSet, metrics)); } this.containerCloseThreshold = conf.getFloat( - ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, - ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); + HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD, + HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 65b97ca8c2c..8f5d8d61f96 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.events.SCMEvents; @@ -91,8 +90,6 @@ public class ContainerMapping implements Mapping { private final PipelineSelector pipelineSelector; private final ContainerStateManager containerStateManager; private final LeaseManager containerLeaseManager; - private final float containerCloseThreshold; - private final ContainerCloser closer; private final EventPublisher eventPublisher; private final long size; @@ -116,7 +113,6 @@ public class ContainerMapping implements Mapping { cacheSizeMB, EventPublisher eventPublisher) throws IOException { this.nodeManager = nodeManager; this.cacheSize = cacheSizeMB; - this.closer = new ContainerCloser(nodeManager, conf); File metaDir = getOzoneMetaDirPath(conf); @@ -140,9 +136,6 @@ public class ContainerMapping implements Mapping { this.pipelineSelector = new PipelineSelector(nodeManager, containerStateManager, conf, eventPublisher); - this.containerCloseThreshold = conf.getFloat( - ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, - ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); this.eventPublisher = eventPublisher; long containerCreationLeaseTimeout = conf.getTimeDuration( @@ -461,18 +454,18 @@ public class ContainerMapping implements Mapping { /** * Return a container matching the attributes specified. * - * @param size - Space needed in the Container. + * @param sizeRequired - Space needed in the Container. * @param owner - Owner of the container - A specific nameservice. * @param type - Replication Type {StandAlone, Ratis} * @param factor - Replication Factor {ONE, THREE} * @param state - State of the Container-- {Open, Allocated etc.} * @return ContainerInfo, null if there is no match found. */ - public ContainerWithPipeline getMatchingContainerWithPipeline(final long size, - String owner, ReplicationType type, ReplicationFactor factor, - LifeCycleState state) throws IOException { + public ContainerWithPipeline getMatchingContainerWithPipeline( + final long sizeRequired, String owner, ReplicationType type, + ReplicationFactor factor, LifeCycleState state) throws IOException { ContainerInfo containerInfo = getStateManager() - .getMatchingContainer(size, owner, type, factor, state); + .getMatchingContainer(sizeRequired, owner, type, factor, state); if (containerInfo == null) { return null; } @@ -563,20 +556,6 @@ public class ContainerMapping implements Mapping { // the updated State. containerStore.put(dbKey, newState.toByteArray()); - // If the container is closed, then state is already written to SCM - Pipeline pipeline = - pipelineSelector.getPipeline( - PipelineID.getFromProtobuf(newState.getPipelineID()), - newState.getReplicationType()); - if(pipeline == null) { - pipeline = pipelineSelector - .getReplicationPipeline(newState.getReplicationType(), - newState.getReplicationFactor()); - } - // DB.TODO: So can we can write only once to DB. - if (closeContainerIfNeeded(newState, pipeline)) { - LOG.info("Closing the Container: {}", newState.getContainerID()); - } } else { // Container not found in our container db. LOG.error("Error while processing container report from datanode :" + @@ -637,52 +616,6 @@ public class ContainerMapping implements Mapping { return builder.build(); } - /** - * Queues the close container command, to datanode and writes the new state - * to container DB. - *

- * TODO : Remove this 2 ContainerInfo definitions. It is brain dead to have - * one protobuf in one file and another definition in another file. - * - * @param newState - This is the state we maintain in SCM. - * @param pipeline - * @throws IOException - */ - private boolean closeContainerIfNeeded(SCMContainerInfo newState, - Pipeline pipeline) - throws IOException { - float containerUsedPercentage = 1.0f * - newState.getUsedBytes() / this.size; - - ContainerInfo scmInfo = getContainer(newState.getContainerID()); - if (containerUsedPercentage >= containerCloseThreshold - && !isClosed(scmInfo)) { - // We will call closer till get to the closed state. - // That is SCM will make this call repeatedly until we reach the closed - // state. - closer.close(newState, pipeline); - - if (shouldClose(scmInfo)) { - // This event moves the Container from Open to Closing State, this is - // a state inside SCM. This is the desired state that SCM wants this - // container to reach. We will know that a container has reached the - // closed state from container reports. This state change should be - // invoked once and only once. - HddsProtos.LifeCycleState state = updateContainerState( - scmInfo.getContainerID(), - HddsProtos.LifeCycleEvent.FINALIZE); - if (state != HddsProtos.LifeCycleState.CLOSING) { - LOG.error("Failed to close container {}, reason : Not able " + - "to " + - "update container state, current container state: {}.", - newState.getContainerID(), state); - return false; - } - return true; - } - } - return false; - } /** * In Container is in closed state, if it is in closed, Deleting or Deleted @@ -699,11 +632,6 @@ public class ContainerMapping implements Mapping { return info.getState() == HddsProtos.LifeCycleState.CLOSED; } - @VisibleForTesting - public ContainerCloser getCloser() { - return closer; - } - /** * Closes this stream and releases any system resources associated with it. * If the stream is diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java deleted file mode 100644 index 7e049285161..00000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.apache.hadoop.hdds.scm.container.closer; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT; - -/** - * A class that manages closing of containers. This allows transition from a - * open but full container to a closed container, to which no data is written. - */ -public class ContainerCloser { - private static final Logger LOG = - LoggerFactory.getLogger(ContainerCloser.class); - private static final long MULTIPLIER = 3L; - private static final int CLEANUP_WATER_MARK = 1000; - private final NodeManager nodeManager; - private final Map commandIssued; - private final Configuration configuration; - private final AtomicInteger mapCount; - private final long reportInterval; - private final AtomicInteger threadRunCount; - private final AtomicBoolean isRunning; - - /** - * Constructs the ContainerCloser class. - * - * @param nodeManager - NodeManager - * @param conf - Configuration - */ - public ContainerCloser(NodeManager nodeManager, Configuration conf) { - Preconditions.checkNotNull(nodeManager); - Preconditions.checkNotNull(conf); - this.nodeManager = nodeManager; - this.configuration = conf; - this.commandIssued = new ConcurrentHashMap<>(); - this.mapCount = new AtomicInteger(0); - this.threadRunCount = new AtomicInteger(0); - this.isRunning = new AtomicBoolean(false); - this.reportInterval = this.configuration.getTimeDuration( - HDDS_CONTAINER_REPORT_INTERVAL, - HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - Preconditions.checkState(this.reportInterval > 0, - "report interval has to be greater than 0"); - } - - @VisibleForTesting - public static int getCleanupWaterMark() { - return CLEANUP_WATER_MARK; - } - - /** - * Sends a Container Close command to the data nodes where this container - * lives. - * - * @param info - ContainerInfo. - * @param pipeline - */ - public void close(SCMContainerInfo info, - Pipeline pipeline) { - - if (commandIssued.containsKey(info.getContainerID())) { - // We check if we issued a close command in last 3 * reportInterval secs. - long commandQueueTime = commandIssued.get(info.getContainerID()); - long currentTime = Time.monotonicNow(); - if (currentTime > commandQueueTime + (MULTIPLIER * reportInterval)) { - commandIssued.remove(info.getContainerID()); - mapCount.decrementAndGet(); - } else { - // Ignore this request, since we just issued a close command. We - // should wait instead of sending a command to datanode again. - return; - } - } - - // if we reached here, it means that we have not issued a command to the - // data node in last (3 times report interval). We are presuming that is - // enough time to close the container. Let us go ahead and queue a close - // to all the datanodes that participate in the container. - // - // Three important things to note here: - // - // 1. It is ok to send this command multiple times to a datanode. Close - // container is an idempotent command, if the container is already closed - // then we have no issues. - // - // 2. The container close command is issued to all datanodes. But - // depending on the pipeline type, some of the datanodes might ignore it. - // - // 3. SCM will see that datanode is closed from container reports, but it - // is possible that datanodes might get close commands since - // this queue can be emptied by a datanode after a close report is send - // to SCM. In that case also, data node will ignore this command. - - for (DatanodeDetails datanodeDetails : pipeline.getMachines()) { - nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(info.getContainerID(), - info.getReplicationType(), - PipelineID.getFromProtobuf(info.getPipelineID()))); - } - if (!commandIssued.containsKey(info.getContainerID())) { - commandIssued.put(info.getContainerID(), Time.monotonicNow()); - mapCount.incrementAndGet(); - } - // run the hash map cleaner thread if needed, non-blocking call. - runCleanerThreadIfNeeded(); - } - - private void runCleanerThreadIfNeeded() { - // Let us check if we should run a cleaner thread, not using map.size - // since it runs a loop in the case of the concurrentMap. - if (mapCount.get() > CLEANUP_WATER_MARK && - isRunning.compareAndSet(false, true)) { - Runnable entryCleaner = () -> { - LOG.debug("Starting close container Hash map cleaner."); - try { - for (Map.Entry entry : commandIssued.entrySet()) { - long commandQueueTime = entry.getValue(); - if (commandQueueTime + (MULTIPLIER * reportInterval) > - Time.monotonicNow()) { - - // It is possible for this remove to fail due to race conditions. - // No big deal we will cleanup next time. - commandIssued.remove(entry.getKey()); - mapCount.decrementAndGet(); - } - } - isRunning.compareAndSet(true, false); - LOG.debug("Finished running, close container Hash map cleaner."); - } catch (Exception ex) { - LOG.error("Unable to finish cleaning the closed containers map.", ex); - } - }; - - // Launch the cleaner thread when we need instead of having a daemon - // thread that is sleeping all the time. We need to set the Daemon to - // true to avoid blocking clean exits. - Thread cleanerThread = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Closed Container Cleaner Thread - %d") - .build().newThread(entryCleaner); - threadRunCount.incrementAndGet(); - cleanerThread.start(); - } - } - - @VisibleForTesting - public int getThreadRunCount() { - return threadRunCount.get(); - } - - @VisibleForTesting - public int getCloseCount() { - return mapCount.get(); - } -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java index c5686f51326..481f94c00c4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java @@ -288,49 +288,6 @@ public class TestContainerMapping { } } - @Test - public void testContainerCloseWithContainerReport() throws IOException { - ContainerInfo info = createContainer(); - DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); - List reports = - new ArrayList<>(); - - StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = - StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); - ciBuilder.setFinalhash("7c45eb4d7ed5e0d2e89aaab7759de02e") - .setSize(5368709120L) - .setUsed(5368705120L) - .setKeyCount(500000000L) - .setReadCount(500000000L) - .setWriteCount(500000000L) - .setReadBytes(5368705120L) - .setWriteBytes(5368705120L) - .setContainerID(info.getContainerID()) - .setDeleteTransactionId(0); - - reports.add(ciBuilder.build()); - - ContainerReportsProto.Builder crBuilder = - ContainerReportsProto.newBuilder(); - crBuilder.addAllReports(reports); - - mapping.processContainerReports(datanodeDetails, crBuilder.build(), false); - - ContainerInfo updatedContainer = - mapping.getContainer(info.getContainerID()); - Assert.assertEquals(500000000L, - updatedContainer.getNumberOfKeys()); - Assert.assertEquals(5368705120L, updatedContainer.getUsedBytes()); - NavigableSet pendingCloseContainers = mapping.getStateManager() - .getMatchingContainerIDs( - containerOwner, - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.CLOSING); - Assert.assertTrue( - pendingCloseContainers.contains(updatedContainer.containerID())); - } - @Test public void testCloseContainer() throws IOException { ContainerInfo info = createContainer(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java deleted file mode 100644 index 210df088f84..00000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.apache.hadoop.hdds.scm.container.closer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.container.ContainerMapping; -import org.apache.hadoop.hdds.scm.container.MockNodeManager; -import org.apache.hadoop.hdds.scm.container.TestContainerMapping; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.server.events.EventQueue; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.SCMTestUtils; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_SIZE_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_SIZE; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent - .CREATE; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent - .CREATED; - -/** - * Test class for Closing Container. - */ -public class TestContainerCloser { - - private static final long GIGABYTE = 1024L * 1024L * 1024L; - private static Configuration configuration; - private static MockNodeManager nodeManager; - private static ContainerMapping mapping; - private static long size; - private static File testDir; - - @BeforeClass - public static void setUp() throws Exception { - configuration = SCMTestUtils.getConf(); - size = (long)configuration.getStorageSize(OZONE_SCM_CONTAINER_SIZE, - OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - configuration.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, - 1, TimeUnit.SECONDS); - testDir = GenericTestUtils - .getTestDir(TestContainerMapping.class.getSimpleName()); - configuration.set(OzoneConfigKeys.OZONE_METADATA_DIRS, - testDir.getAbsolutePath()); - nodeManager = new MockNodeManager(true, 10); - mapping = new ContainerMapping(configuration, nodeManager, 128, - new EventQueue()); - } - - @AfterClass - public static void tearDown() throws Exception { - if (mapping != null) { - mapping.close(); - } - FileUtil.fullyDelete(testDir); - } - - @Test - public void testClose() throws IOException { - ContainerWithPipeline containerWithPipeline = mapping.allocateContainer( - HddsProtos.ReplicationType.STAND_ALONE, - HddsProtos.ReplicationFactor.ONE, "ozone"); - ContainerInfo info = containerWithPipeline.getContainerInfo(); - - //Execute these state transitions so that we can close the container. - mapping.updateContainerState(info.getContainerID(), CREATE); - mapping.updateContainerState(info.getContainerID(), CREATED); - long currentCount = mapping.getCloser().getCloseCount(); - long runCount = mapping.getCloser().getThreadRunCount(); - - DatanodeDetails datanode = containerWithPipeline.getPipeline().getLeader(); - // Send a container report with used set to 1 GB. This should not close. - sendContainerReport(info, 1 * GIGABYTE); - - // with only one container the cleaner thread should not run. - Assert.assertEquals(0, mapping.getCloser().getThreadRunCount()); - - // With only 1 GB, the container should not be queued for closing. - Assert.assertEquals(0, mapping.getCloser().getCloseCount()); - - // Assert that the Close command was not queued for this Datanode. - Assert.assertEquals(0, nodeManager.getCommandCount(datanode)); - - long newUsed = (long) (size * 0.91f); - sendContainerReport(info, newUsed); - - // with only one container the cleaner thread should not run. - Assert.assertEquals(runCount, mapping.getCloser().getThreadRunCount()); - - // and close count will be one. - Assert.assertEquals(1, - mapping.getCloser().getCloseCount() - currentCount); - - // Assert that the Close command was Queued for this Datanode. - Assert.assertEquals(1, nodeManager.getCommandCount(datanode)); - } - - @Test - public void testRepeatedClose() throws IOException, - InterruptedException { - // This test asserts that if we queue more than one report then the - // second report is discarded by the system if it lands in the 3 * report - // frequency window. - - configuration.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, - TimeUnit.SECONDS); - - ContainerWithPipeline containerWithPipeline = mapping.allocateContainer( - HddsProtos.ReplicationType.STAND_ALONE, - HddsProtos.ReplicationFactor.ONE, "ozone"); - ContainerInfo info = containerWithPipeline.getContainerInfo(); - - //Execute these state transitions so that we can close the container. - mapping.updateContainerState(info.getContainerID(), CREATE); - - long currentCount = mapping.getCloser().getCloseCount(); - long runCount = mapping.getCloser().getThreadRunCount(); - - DatanodeDetails datanodeDetails = containerWithPipeline.getPipeline() - .getLeader(); - - // Send this command twice and assert we have only one command in queue. - sendContainerReport(info, 5 * GIGABYTE); - sendContainerReport(info, 5 * GIGABYTE); - - // Assert that the Close command was Queued for this Datanode. - Assert.assertEquals(1, - nodeManager.getCommandCount(datanodeDetails)); - // And close count will be one. - Assert.assertEquals(1, - mapping.getCloser().getCloseCount() - currentCount); - Thread.sleep(TimeUnit.SECONDS.toMillis(4)); - - //send another close and the system will queue this to the command queue. - sendContainerReport(info, 5 * GIGABYTE); - Assert.assertEquals(2, - nodeManager.getCommandCount(datanodeDetails)); - // but the close count will still be one, since from the point of view of - // closer we are closing only one container even if we have send multiple - // close commands to the datanode. - Assert.assertEquals(1, mapping.getCloser().getCloseCount() - - currentCount); - } - - @Test - public void testCleanupThreadRuns() throws IOException, - InterruptedException { - // This test asserts that clean up thread runs once we have closed a - // number above cleanup water mark. - - long runCount = mapping.getCloser().getThreadRunCount(); - - for (int x = 0; x < ContainerCloser.getCleanupWaterMark() + 10; x++) { - ContainerWithPipeline containerWithPipeline = mapping.allocateContainer( - HddsProtos.ReplicationType.STAND_ALONE, - HddsProtos.ReplicationFactor.ONE, "ozone"); - ContainerInfo info = containerWithPipeline.getContainerInfo(); - mapping.updateContainerState(info.getContainerID(), CREATE); - mapping.updateContainerState(info.getContainerID(), CREATED); - sendContainerReport(info, 5 * GIGABYTE); - } - - Thread.sleep(TimeUnit.SECONDS.toMillis(1)); - - // Assert that cleanup thread ran at least once. - Assert.assertTrue(mapping.getCloser().getThreadRunCount() - runCount > 0); - } - - private void sendContainerReport(ContainerInfo info, long used) throws - IOException { - ContainerReportsProto.Builder - reports = ContainerReportsProto.newBuilder(); - - StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = - StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); - ciBuilder.setContainerID(info.getContainerID()) - .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") - .setSize(size) - .setUsed(used) - .setKeyCount(100000000L) - .setReadCount(100000000L) - .setWriteCount(100000000L) - .setReadBytes(2000000000L) - .setWriteBytes(2000000000L) - .setDeleteTransactionId(0); - reports.addReports(ciBuilder); - mapping.processContainerReports(TestUtils.randomDatanodeDetails(), - reports.build(), false); - } -} \ No newline at end of file