diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index 8a82c821628..5950d9a7fd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -27,9 +27,11 @@ import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.scm.container.closer.ContainerCloser; import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -57,6 +59,9 @@ import java.util.concurrent.locks.ReentrantLock; import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes .FAILED_TO_CHANGE_CONTAINER_STATE; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_SIZE_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB; /** * Mapping class contains the mapping from a name to a pipeline mapping. This @@ -77,6 +82,8 @@ public class ContainerMapping implements Mapping { private final LeaseManager containerLeaseManager; private final ContainerSupervisor containerSupervisor; private final float containerCloseThreshold; + private final ContainerCloser closer; + private final long size; /** * Constructs a mapping class that creates mapping between container names @@ -98,6 +105,7 @@ public class ContainerMapping implements Mapping { cacheSizeMB) throws IOException { this.nodeManager = nodeManager; this.cacheSize = cacheSizeMB; + this.closer = new ContainerCloser(nodeManager, conf); File metaDir = OzoneUtils.getOzoneMetaDirPath(conf); @@ -113,6 +121,10 @@ public class ContainerMapping implements Mapping { this.lock = new ReentrantLock(); this.pipelineSelector = new PipelineSelector(nodeManager, conf); + + // To be replaced with code getStorageSize once it is committed. + size = conf.getLong(OZONE_SCM_CONTAINER_SIZE_GB, + OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024; this.containerStateManager = new ContainerStateManager(conf, this); this.containerSupervisor = @@ -342,6 +354,7 @@ public class ContainerMapping implements Mapping { /** * Returns the container State Manager. + * * @return ContainerStateManager */ @Override @@ -351,6 +364,18 @@ public class ContainerMapping implements Mapping { /** * Process container report from Datanode. + *

+ * Processing follows a very simple logic for time being. + *

+ * 1. Datanodes report the current State -- denoted by the datanodeState + *

+ * 2. We are the older SCM state from the Database -- denoted by + * the knownState. + *

+ * 3. We copy the usage etc. from currentState to newState and log that + * newState to the DB. This allows us SCM to bootup again and read the + * state of the world from the DB, and then reconcile the state from + * container reports, when they arrive. * * @param reports Container report */ @@ -360,63 +385,37 @@ public class ContainerMapping implements Mapping { List containerInfos = reports.getReportsList(); containerSupervisor.handleContainerReport(reports); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo containerInfo : + for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : containerInfos) { - byte[] dbKey = containerInfo.getContainerNameBytes().toByteArray(); + byte[] dbKey = datanodeState.getContainerNameBytes().toByteArray(); lock.lock(); try { byte[] containerBytes = containerStore.get(dbKey); if (containerBytes != null) { - OzoneProtos.SCMContainerInfo oldInfo = + OzoneProtos.SCMContainerInfo knownState = OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); - OzoneProtos.SCMContainerInfo.Builder builder = - OzoneProtos.SCMContainerInfo.newBuilder(); - builder.setContainerName(oldInfo.getContainerName()); - builder.setPipeline(oldInfo.getPipeline()); - // If used size is greater than allocated size, we will be updating - // allocated size with used size. This update is done as a fallback - // mechanism in case SCM crashes without properly updating allocated - // size. Correct allocated value will be updated by - // ContainerStateManager during SCM shutdown. - long usedSize = containerInfo.getUsed(); - long allocated = oldInfo.getAllocatedBytes() > usedSize ? - oldInfo.getAllocatedBytes() : usedSize; - builder.setAllocatedBytes(allocated); - builder.setUsedBytes(containerInfo.getUsed()); - builder.setNumberOfKeys(containerInfo.getKeyCount()); - builder.setState(oldInfo.getState()); - builder.setStateEnterTime(oldInfo.getStateEnterTime()); - builder.setContainerID(oldInfo.getContainerID()); - if (oldInfo.getOwner() != null) { - builder.setOwner(oldInfo.getOwner()); - } - OzoneProtos.SCMContainerInfo newContainerInfo = builder.build(); - containerStore.put(dbKey, newContainerInfo.toByteArray()); - float containerUsedPercentage = 1.0f * - containerInfo.getUsed() / containerInfo.getSize(); - // TODO: Handling of containers which are already in close queue. - if (containerUsedPercentage >= containerCloseThreshold) { - // TODO: The container has to be moved to close container queue. - // For now, we are just updating the container state to CLOSING. - // Close container implementation can decide on how to maintain - // list of containers to be closed, this is the place where we - // have to add the containers to that list. - OzoneProtos.LifeCycleState state = updateContainerState( - ContainerInfo.fromProtobuf(newContainerInfo).getContainerName(), - OzoneProtos.LifeCycleEvent.FINALIZE); - if (state != OzoneProtos.LifeCycleState.CLOSING) { - LOG.error("Failed to close container {}, reason : Not able to " + - "update container state, current container state: {}.", - containerInfo.getContainerName(), state); - } + OzoneProtos.SCMContainerInfo newState = + reconcileState(datanodeState, knownState); + + // FIX ME: This can be optimized, we write twice to memory, where a + // single write would work well. + // + // We need to write this to DB again since the closed only write + // the updated State. + containerStore.put(dbKey, newState.toByteArray()); + + // If the container is closed, then state is already written to SCM + // DB.TODO: So can we can write only once to DB. + if (closeContainerIfNeeded(newState)) { + LOG.info("Closing the Container: {}", newState.getContainerName()); } } else { // Container not found in our container db. LOG.error("Error while processing container report from datanode :" + - " {}, for container: {}, reason: container doesn't exist in" + - "container database.", reports.getDatanodeID(), - containerInfo.getContainerName()); + " {}, for container: {}, reason: container doesn't exist in" + + "container database.", reports.getDatanodeID(), + datanodeState.getContainerName()); } } finally { lock.unlock(); @@ -424,11 +423,110 @@ public class ContainerMapping implements Mapping { } } + /** + * Reconciles the state from Datanode with the state in SCM. + * + * @param datanodeState - State from the Datanode. + * @param knownState - State inside SCM. + * @return new SCM State for this container. + */ + private OzoneProtos.SCMContainerInfo reconcileState( + StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState, + OzoneProtos.SCMContainerInfo knownState) { + OzoneProtos.SCMContainerInfo.Builder builder = + OzoneProtos.SCMContainerInfo.newBuilder(); + builder.setContainerName(knownState.getContainerName()); + builder.setPipeline(knownState.getPipeline()); + // If used size is greater than allocated size, we will be updating + // allocated size with used size. This update is done as a fallback + // mechanism in case SCM crashes without properly updating allocated + // size. Correct allocated value will be updated by + // ContainerStateManager during SCM shutdown. + long usedSize = datanodeState.getUsed(); + long allocated = knownState.getAllocatedBytes() > usedSize ? + knownState.getAllocatedBytes() : usedSize; + builder.setAllocatedBytes(allocated); + builder.setUsedBytes(usedSize); + builder.setNumberOfKeys(datanodeState.getKeyCount()); + builder.setState(knownState.getState()); + builder.setStateEnterTime(knownState.getStateEnterTime()); + builder.setContainerID(knownState.getContainerID()); + if (knownState.getOwner() != null) { + builder.setOwner(knownState.getOwner()); + } + return builder.build(); + } + + /** + * Queues the close container command, to datanode and writes the new state + * to container DB. + *

+ * TODO : Remove this 2 ContainerInfo definitions. It is brain dead to have + * one protobuf in one file and another definition in another file. + * + * @param newState - This is the state we maintain in SCM. + * @throws IOException + */ + private boolean closeContainerIfNeeded(OzoneProtos.SCMContainerInfo newState) + throws IOException { + float containerUsedPercentage = 1.0f * + newState.getUsedBytes() / this.size; + + ContainerInfo scmInfo = getContainer(newState.getContainerName()); + if (containerUsedPercentage >= containerCloseThreshold + && !isClosed(scmInfo)) { + // We will call closer till get to the closed state. + // That is SCM will make this call repeatedly until we reach the closed + // state. + closer.close(newState); + + if (shouldClose(scmInfo)) { + // This event moves the Container from Open to Closing State, this is + // a state inside SCM. This is the desired state that SCM wants this + // container to reach. We will know that a container has reached the + // closed state from container reports. This state change should be + // invoked once and only once. + OzoneProtos.LifeCycleState state = updateContainerState( + scmInfo.getContainerName(), + OzoneProtos.LifeCycleEvent.FINALIZE); + if (state != OzoneProtos.LifeCycleState.CLOSING) { + LOG.error("Failed to close container {}, reason : Not able " + + "to " + + "update container state, current container state: {}.", + newState.getContainerName(), state); + return false; + } + return true; + } + } + return false; + } + + /** + * In Container is in closed state, if it is in closed, Deleting or Deleted + * State. + * + * @param info - ContainerInfo. + * @return true if is in open state, false otherwise + */ + private boolean shouldClose(ContainerInfo info) { + return info.getState() == OzoneProtos.LifeCycleState.OPEN; + } + + private boolean isClosed(ContainerInfo info) { + return info.getState() == OzoneProtos.LifeCycleState.CLOSED; + } + + @VisibleForTesting + public ContainerCloser getCloser() { + return closer; + } + /** * Closes this stream and releases any system resources associated with it. * If the stream is * already closed then invoking this method has no effect. - * + *

*

As noted in {@link AutoCloseable#close()}, cases where the close may * fail require careful * attention. It is strongly advised to relinquish the underlying resources @@ -457,7 +555,7 @@ public class ContainerMapping implements Mapping { * containerStateManager, when closing ContainerMapping, we need to update * this in the container store. * - * @throws IOException on failure. + * @throws IOException on failure. */ @VisibleForTesting public void flushContainerInfo() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java new file mode 100644 index 00000000000..937138737f2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.ozone.scm.container.closer; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT; + +/** + * A class that manages closing of containers. This allows transition from a + * open but full container to a closed container, to which no data is written. + */ +public class ContainerCloser { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerCloser.class); + private static final long MULTIPLIER = 3L; + private static final int CLEANUP_WATER_MARK = 1000; + private final NodeManager nodeManager; + private final Map commandIssued; + private final Configuration configuration; + private final AtomicInteger mapCount; + private final long reportInterval; + private final AtomicInteger threadRunCount; + private final AtomicBoolean isRunning; + + /** + * Constructs the ContainerCloser class. + * + * @param nodeManager - NodeManager + * @param conf - Configuration + */ + public ContainerCloser(NodeManager nodeManager, Configuration conf) { + Preconditions.checkNotNull(nodeManager); + Preconditions.checkNotNull(conf); + this.nodeManager = nodeManager; + this.configuration = conf; + this.commandIssued = new ConcurrentHashMap<>(); + this.mapCount = new AtomicInteger(0); + this.threadRunCount = new AtomicInteger(0); + this.isRunning = new AtomicBoolean(false); + this.reportInterval = this.configuration.getTimeDuration( + OZONE_CONTAINER_REPORT_INTERVAL, + OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT, TimeUnit.SECONDS); + Preconditions.checkState(this.reportInterval > 0, + "report interval has to be greater than 0"); + } + + @VisibleForTesting + public static int getCleanupWaterMark() { + return CLEANUP_WATER_MARK; + } + + /** + * Sends a Container Close command to the data nodes where this container + * lives. + * + * @param info - ContainerInfo. + */ + public void close(OzoneProtos.SCMContainerInfo info) { + + if (commandIssued.containsKey(info.getContainerName())) { + // We check if we issued a close command in last 3 * reportInterval secs. + long commandQueueTime = commandIssued.get(info.getContainerName()); + long currentTime = TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow()); + if (currentTime > commandQueueTime + (MULTIPLIER * reportInterval)) { + commandIssued.remove(info.getContainerName()); + mapCount.decrementAndGet(); + } else { + // Ignore this request, since we just issued a close command. We + // should wait instead of sending a command to datanode again. + return; + } + } + + // if we reached here, it means that we have not issued a command to the + // data node in last (3 times report interval). We are presuming that is + // enough time to close the container. Let us go ahead and queue a close + // to all the datanodes that participate in the container. + // + // Three important things to note here: + // + // 1. It is ok to send this command multiple times to a datanode. Close + // container is an idempotent command, if the container is already closed + // then we have no issues. + // + // 2. The container close command is issued to all datanodes. But + // depending on the pipeline type, some of the datanodes might ignore it. + // + // 3. SCM will see that datanode is closed from container reports, but it + // is possible that datanodes might get close commands since + // this queue can be emptied by a datanode after a close report is send + // to SCM. In that case also, data node will ignore this command. + + OzoneProtos.Pipeline pipeline = info.getPipeline(); + for (HdfsProtos.DatanodeIDProto datanodeID : + pipeline.getPipelineChannel().getMembersList()) { + nodeManager.addDatanodeCommand(DatanodeID.getFromProtoBuf(datanodeID), + new CloseContainerCommand(info.getContainerName())); + } + if (!commandIssued.containsKey(info.getContainerName())) { + commandIssued.put(info.getContainerName(), + TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow())); + mapCount.incrementAndGet(); + } + // run the hash map cleaner thread if needed, non-blocking call. + runCleanerThreadIfNeeded(); + } + + private void runCleanerThreadIfNeeded() { + // Let us check if we should run a cleaner thread, not using map.size + // since it runs a loop in the case of the concurrentMap. + if (mapCount.get() > CLEANUP_WATER_MARK && + isRunning.compareAndSet(false, true)) { + Runnable entryCleaner = () -> { + LOG.debug("Starting close container Hash map cleaner."); + try { + for (Map.Entry entry : commandIssued.entrySet()) { + long commandQueueTime = entry.getValue(); + if (commandQueueTime + (MULTIPLIER * reportInterval) > + TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow())) { + + // It is possible for this remove to fail due to race conditions. + // No big deal we will cleanup next time. + commandIssued.remove(entry.getKey()); + mapCount.decrementAndGet(); + } + } + isRunning.compareAndSet(true, false); + LOG.debug("Finished running, close container Hash map cleaner."); + } catch (Exception ex) { + LOG.error("Unable to finish cleaning the closed containers map.", ex); + } + }; + + // Launch the cleaner thread when we need instead of having a daemon + // thread that is sleeping all the time. We need to set the Daemon to + // true to avoid blocking clean exits. + Thread cleanerThread = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Closed Container Cleaner Thread - %d") + .build().newThread(entryCleaner); + threadRunCount.incrementAndGet(); + cleanerThread.start(); + } + } + + @VisibleForTesting + public int getThreadRunCount() { + return threadRunCount.get(); + } + + @VisibleForTesting + public int getCloseCount() { + return mapCount.get(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/package-info.java new file mode 100644 index 00000000000..2d0f257b334 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +/** + * This package has class that close a container. That is move a container from + * open state to close state. + */ +package org.apache.hadoop.ozone.scm.container.closer; \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index ce032b41a0d..77b3bff63aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -148,5 +148,5 @@ public interface NodeManager extends StorageContainerNodeProtocol, * @param id * @param command */ - default void addDatanodeCommand(DatanodeID id, SCMCommand command) {} + void addDatanodeCommand(DatanodeID id, SCMCommand command); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 4c7c7236f88..fde4819826c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -1106,6 +1106,14 @@ for more info. + + ozone.scm.max.nodepool.processing.threads + 1 + OZONE, MANAGEMENT, PERFORMANCE + + Number of node pools to process in parallel. + + ozone.scm.names diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java index 13141265b37..95de3510970 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodePoolManager; import org.mockito.Mockito; +import org.assertj.core.util.Preconditions; import java.io.IOException; import java.util.HashMap; @@ -73,6 +74,7 @@ public class MockNodeManager implements NodeManager { private final Map nodeMetricMap; private final SCMNodeStat aggregateStat; private boolean chillmode; + private final Map> commandMap; public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); @@ -87,6 +89,7 @@ public class MockNodeManager implements NodeManager { } } chillmode = false; + this.commandMap = new HashMap<>(); } /** @@ -297,6 +300,31 @@ public class MockNodeManager implements NodeManager { return null; } + @Override + public void addDatanodeCommand(DatanodeID id, SCMCommand command) { + if(commandMap.containsKey(id)) { + List commandList = commandMap.get(id); + Preconditions.checkNotNull(commandList); + commandList.add(command); + } else { + List commandList = new LinkedList<>(); + commandList.add(command); + commandMap.put(id, commandList); + } + } + + // Returns the number of commands that is queued to this node manager. + public int getCommandCount(DatanodeID id) { + List list = commandMap.get(id); + return (list == null) ? 0 : list.size(); + } + + public void clearCommandQueue(DatanodeID id) { + if(commandMap.containsKey(id)) { + commandMap.put(id, new LinkedList<>()); + } + } + /** * Closes this stream and releases any system resources associated with it. If * the stream is already closed then invoking this method has no effect. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/closer/TestContainerCloser.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/closer/TestContainerCloser.java new file mode 100644 index 00000000000..7e722c0f1a0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/closer/TestContainerCloser.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.ozone.scm.container.closer; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.scm.container.ContainerMapping; +import org.apache.hadoop.ozone.scm.container.MockNodeManager; +import org.apache.hadoop.ozone.scm.container.TestContainerMapping; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent.CREATE; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent.CREATED; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB; + +/** + * Test class for Closing Container. + */ +public class TestContainerCloser { + + private static final long GIGABYTE = 1024L * 1024L * 1024L; + private static Configuration configuration; + private static MockNodeManager nodeManager; + private static ContainerMapping mapping; + private static long size; + private static File testDir; + + @BeforeClass + public static void setUp() throws Exception { + configuration = SCMTestUtils.getConf(); + size = configuration.getLong(OZONE_SCM_CONTAINER_SIZE_GB, + OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024; + configuration.setTimeDuration(OZONE_CONTAINER_REPORT_INTERVAL, + 1, TimeUnit.SECONDS); + testDir = GenericTestUtils + .getTestDir(TestContainerMapping.class.getSimpleName()); + configuration.set(OzoneConfigKeys.OZONE_METADATA_DIRS, + testDir.getAbsolutePath()); + nodeManager = new MockNodeManager(true, 10); + mapping = new ContainerMapping(configuration, nodeManager, 128); + } + + @AfterClass + public static void tearDown() throws Exception { + if (mapping != null) { + mapping.close(); + } + FileUtil.fullyDelete(testDir); + } + + @Test + public void testClose() throws IOException { + String containerName = "container-" + RandomStringUtils.randomNumeric(5); + + ContainerInfo info = mapping.allocateContainer( + OzoneProtos.ReplicationType.STAND_ALONE, + OzoneProtos.ReplicationFactor.ONE, containerName, "ozone"); + + //Execute these state transitions so that we can close the container. + mapping.updateContainerState(containerName, CREATE); + mapping.updateContainerState(containerName, CREATED); + long currentCount = mapping.getCloser().getCloseCount(); + long runCount = mapping.getCloser().getThreadRunCount(); + + DatanodeID datanodeID = info.getPipeline().getLeader(); + // Send a container report with used set to 1 GB. This should not close. + sendContainerReport(info, 1 * GIGABYTE); + + // with only one container the cleaner thread should not run. + Assert.assertEquals(0, mapping.getCloser().getThreadRunCount()); + + // With only 1 GB, the container should not be queued for closing. + Assert.assertEquals(0, mapping.getCloser().getCloseCount()); + + // Assert that the Close command was not queued for this Datanode. + Assert.assertEquals(0, nodeManager.getCommandCount(datanodeID)); + + long newUsed = (long) (size * 0.91f); + sendContainerReport(info, newUsed); + + // with only one container the cleaner thread should not run. + Assert.assertEquals(runCount, mapping.getCloser().getThreadRunCount()); + + // and close count will be one. + Assert.assertEquals(1, + mapping.getCloser().getCloseCount() - currentCount); + + // Assert that the Close command was Queued for this Datanode. + Assert.assertEquals(1, nodeManager.getCommandCount(datanodeID)); + } + + @Test + public void testRepeatedClose() throws IOException, + InterruptedException { + // This test asserts that if we queue more than one report then the + // second report is discarded by the system if it lands in the 3 * report + // frequency window. + + configuration.setTimeDuration(OZONE_CONTAINER_REPORT_INTERVAL, 1, + TimeUnit.SECONDS); + String containerName = "container-" + RandomStringUtils.randomNumeric(5); + + ContainerInfo info = mapping.allocateContainer( + OzoneProtos.ReplicationType.STAND_ALONE, + OzoneProtos.ReplicationFactor.ONE, containerName, "ozone"); + + //Execute these state transitions so that we can close the container. + mapping.updateContainerState(containerName, CREATE); + + long currentCount = mapping.getCloser().getCloseCount(); + long runCount = mapping.getCloser().getThreadRunCount(); + + + DatanodeID datanodeID = info.getPipeline().getLeader(); + + // Send this command twice and assert we have only one command in the queue. + sendContainerReport(info, 5 * GIGABYTE); + sendContainerReport(info, 5 * GIGABYTE); + + // Assert that the Close command was Queued for this Datanode. + Assert.assertEquals(1, + nodeManager.getCommandCount(datanodeID)); + // And close count will be one. + Assert.assertEquals(1, + mapping.getCloser().getCloseCount() - currentCount); + Thread.sleep(TimeUnit.SECONDS.toMillis(4)); + + //send another close and the system will queue this to the command queue. + sendContainerReport(info, 5 * GIGABYTE); + Assert.assertEquals(2, + nodeManager.getCommandCount(datanodeID)); + // but the close count will still be one, since from the point of view of + // closer we are closing only one container even if we have send multiple + // close commands to the datanode. + Assert.assertEquals(1, mapping.getCloser().getCloseCount() + - currentCount); + } + + @Test + public void testCleanupThreadRuns() throws IOException, + InterruptedException { + // This test asserts that clean up thread runs once we have closed a + // number above cleanup water mark. + + long runCount = mapping.getCloser().getThreadRunCount(); + + for (int x = 0; x < ContainerCloser.getCleanupWaterMark() + 10; x++) { + String containerName = "container-" + RandomStringUtils.randomNumeric(7); + ContainerInfo info = mapping.allocateContainer( + OzoneProtos.ReplicationType.STAND_ALONE, + OzoneProtos.ReplicationFactor.ONE, containerName, "ozone"); + mapping.updateContainerState(containerName, CREATE); + mapping.updateContainerState(containerName, CREATED); + sendContainerReport(info, 5 * GIGABYTE); + } + + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + + // Assert that cleanup thread ran at least once. + Assert.assertTrue(mapping.getCloser().getThreadRunCount() - runCount > 0); + } + + private void sendContainerReport(ContainerInfo info, long used) throws + IOException { + ContainerReportsRequestProto.Builder + reports = ContainerReportsRequestProto.newBuilder(); + reports.setType(ContainerReportsRequestProto.reportType.fullReport); + + StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = + StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); + ciBuilder.setContainerName(info.getContainerName()) + .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") + .setSize(size) + .setUsed(used) + .setKeyCount(100000000L) + .setReadCount(100000000L) + .setWriteCount(100000000L) + .setReadBytes(2000000000L) + .setWriteBytes(2000000000L) + .setContainerID(1L); + reports.setDatanodeID( + DFSTestUtil.getLocalDatanodeID().getProtoBufMessage()); + reports.addReports(ciBuilder); + mapping.processContainerReports(reports.build()); + } +} \ No newline at end of file