From 56f011fd023dd5b1ae5ed92df12f2ebccd3613e1 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Wed, 1 Mar 2017 11:47:22 -0800 Subject: [PATCH] HDFS-11447. Ozone: SCM: Send node report to SCM with heartbeat.Contributed by Xiaoyu Yao. --- .../common/impl/ContainerManagerImpl.java | 28 ++- .../common/impl/ContainerStorageLocation.java | 187 +++++++++++++++ .../common/impl/StorageLocationReport.java | 63 +++++ .../interfaces/ContainerLocationManager.java | 14 ++ .../common/interfaces/ContainerManager.java | 9 +- .../common/interfaces/package-info.java | 20 ++ .../statemachine/DatanodeStateMachine.java | 1 + .../common/statemachine/StateContext.java | 19 ++ .../states/datanode/RunningDatanodeState.java | 1 + .../endpoint/HeartbeatEndpointTask.java | 24 +- .../container/ozoneimpl/OzoneContainer.java | 11 + .../StorageContainerDatanodeProtocol.java | 6 +- .../StorageContainerNodeProtocol.java | 5 +- ...atanodeProtocolClientSideTranslatorPB.java | 12 +- ...atanodeProtocolServerSideTranslatorPB.java | 4 +- .../ozone/scm/StorageContainerManager.java | 9 +- .../ozone/scm/node/HeartbeatQueueItem.java | 98 ++++++++ .../hadoop/ozone/scm/node/NodeManager.java | 12 + .../hadoop/ozone/scm/node/SCMNodeManager.java | 127 ++++++++-- .../hadoop/ozone/scm/node/SCMNodeStat.java | 94 ++++++++ .../StorageContainerDatanodeProtocol.proto | 17 ++ .../ozone/container/common/ScmTestMock.java | 5 +- .../ozone/container/common/TestEndPoint.java | 4 +- .../ozone/scm/container/MockNodeManager.java | 26 ++- .../ozone/scm/node/TestNodeManager.java | 221 +++++++++++++++--- 25 files changed, 941 insertions(+), 76 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java index 88bdebc2af5..2beb5251c64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java @@ -24,6 +24,10 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OzoneConsts; @@ -120,7 +124,7 @@ public class ContainerManagerImpl implements ContainerManager { dataDirs.add(location); } this.locationManager = - new ContainerLocationManagerImpl(containerDirs, dataDirs); + new ContainerLocationManagerImpl(containerDirs, dataDirs, config); } finally { readUnlock(); @@ -395,9 +399,10 @@ public class ContainerManagerImpl implements ContainerManager { * @throws IOException */ @Override - public void shutdown() { + public void shutdown() throws IOException { Preconditions.checkState(this.hasWriteLock()); this.containerMap.clear(); + this.locationManager.shutdown(); } @@ -497,6 +502,25 @@ public class ContainerManagerImpl implements ContainerManager { return this.keyManager; } + /** + * Get the node report. + * @return node report. + */ + @Override + public SCMNodeReport getNodeReport() throws IOException { + StorageLocationReport[] reports = locationManager.getLocationReport(); + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + for (int i = 0; i < reports.length; i++) { + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + nrb.addStorageReport(i, srb.setStorageUuid(reports[i].getId()) + .setCapacity(reports[i].getCapacity()) + .setScmUsed(reports[i].getScmUsed()) + .setRemaining(reports[i].getRemaining()) + .build()); + } + return nrb.build(); + } + /** * Filter out only container files from the container metadata dir. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java new file mode 100644 index 00000000000..14dcc3d229a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CachingGetSpaceUsed; +import org.apache.hadoop.fs.DF; +import org.apache.hadoop.fs.GetSpaceUsed; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Scanner; + +import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY; + +/** + * Class that wraps the space usage of the Datanode Container Storage Location + * by SCM containers. + */ +public class ContainerStorageLocation { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerStorageLocation.class); + + private static final String DU_CACHE_FILE = "scmUsed"; + private volatile boolean scmUsedSaved = false; + + private final StorageLocation dataLocation; + private final String storageUuId; + private final DF usage; + private final GetSpaceUsed scmUsage; + private final File scmUsedFile; + + public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf) + throws IOException { + this.dataLocation = dataLoc; + this.storageUuId = DatanodeStorage.generateUuid(); + File dataDir = new File(dataLoc.getNormalizedUri().getPath()); + scmUsedFile = new File(dataDir, DU_CACHE_FILE); + // get overall disk usage + this.usage = new DF(dataDir, conf); + // get SCM specific usage + this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir) + .setConf(conf) + .setInitialUsed(loadScmUsed()) + .build(); + + // Ensure scm usage is saved during shutdown. + ShutdownHookManager.get().addShutdownHook( + new Runnable() { + @Override + public void run() { + if (!scmUsedSaved) { + saveScmUsed(); + } + } + }, SHUTDOWN_HOOK_PRIORITY); + } + + public URI getNormalizedUri() { + return dataLocation.getNormalizedUri(); + } + + public String getStorageUuId() { + return storageUuId; + } + public long getCapacity() { + long capacity = usage.getCapacity(); + return (capacity > 0) ? capacity : 0; + } + + public long getAvailable() throws IOException { + long remaining = getCapacity() - getScmUsed(); + long available = usage.getAvailable(); + if (remaining > available) { + remaining = available; + } + return (remaining > 0) ? remaining : 0; + } + + public long getScmUsed() throws IOException{ + return scmUsage.getUsed(); + } + + public void shutdown() { + saveScmUsed(); + scmUsedSaved = true; + + if (scmUsage instanceof CachingGetSpaceUsed) { + IOUtils.cleanup(null, ((CachingGetSpaceUsed) scmUsage)); + } + } + + /** + * Read in the cached DU value and return it if it is less than 600 seconds + * old (DU update interval). Slight imprecision of scmUsed is not critical + * and skipping DU can significantly shorten the startup time. + * If the cached value is not available or too old, -1 is returned. + */ + long loadScmUsed() { + long cachedScmUsed; + long mtime; + Scanner sc; + + try { + sc = new Scanner(scmUsedFile, "UTF-8"); + } catch (FileNotFoundException fnfe) { + return -1; + } + + try { + // Get the recorded scmUsed from the file. + if (sc.hasNextLong()) { + cachedScmUsed = sc.nextLong(); + } else { + return -1; + } + // Get the recorded mtime from the file. + if (sc.hasNextLong()) { + mtime = sc.nextLong(); + } else { + return -1; + } + + // Return the cached value if mtime is okay. + if (mtime > 0 && (Time.now() - mtime < 600000L)) { + LOG.info("Cached ScmUsed found for {} : {} ", dataLocation, + cachedScmUsed); + return cachedScmUsed; + } + return -1; + } finally { + sc.close(); + } + } + + /** + * Write the current scmUsed to the cache file. + */ + void saveScmUsed() { + if (scmUsedFile.exists() && !scmUsedFile.delete()) { + LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation); + } + OutputStreamWriter out = null; + try { + long used = getScmUsed(); + if (used > 0) { + out = new OutputStreamWriter(new FileOutputStream(scmUsedFile), + StandardCharsets.UTF_8); + // mtime is written last, so that truncated writes won't be valid. + out.write(Long.toString(used) + " " + Long.toString(Time.now())); + out.flush(); + out.close(); + out = null; + } + } catch (IOException ioe) { + // If write failed, the volume might be bad. Since the cache file is + // not critical, log the error and continue. + LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe); + } finally { + IOUtils.cleanup(null, out); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java new file mode 100644 index 00000000000..7ef91a91f78 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +/** + * Storage location stats of datanodes that provide back store for containers. + * + */ +public class StorageLocationReport { + public static final StorageLocationReport[] EMPTY_ARRAY = {}; + + private final String id; + private final boolean failed; + private final long capacity; + private final long scmUsed; + private final long remaining; + + public StorageLocationReport(String id, boolean failed, + long capacity, long scmUsed, long remaining) { + this.id = id; + this.failed = failed; + this.capacity = capacity; + this.scmUsed = scmUsed; + this.remaining = remaining; + } + + public String getId() { + return id; + } + + public boolean isFailed() { + return failed; + } + + public long getCapacity() { + return capacity; + } + + public long getScmUsed() { + return scmUsed; + } + + public long getRemaining() { + return remaining; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java index 00e7223d212..9c5fcea1639 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.ozone.container.common.interfaces; +import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; + import java.io.IOException; import java.nio.file.Path; @@ -41,4 +43,16 @@ public interface ContainerLocationManager { */ Path getDataPath(String containerName) throws IOException; + /** + * Returns an array of storage location usage report. + * @return storage location usage report. + */ + StorageLocationReport[] getLocationReport() throws IOException; + + /** + * Supports clean shutdown of container. + * + * @throws IOException + */ + void shutdown() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java index 71bc6049040..b063085297a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.IOException; @@ -93,7 +95,7 @@ public interface ContainerManager extends RwLock { * * @throws IOException */ - void shutdown(); + void shutdown() throws IOException; /** * Sets the Chunk Manager. @@ -123,4 +125,9 @@ public interface ContainerManager extends RwLock { */ KeyManager getKeyManager(); + /** + * Get the Node Report of container storage usage. + * @return node report. + */ + SCMNodeReport getNodeReport() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java new file mode 100644 index 00000000000..d83bf95c362 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.interfaces; +/** + This package contains common ozone container interfaces. + */ \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index db837344023..5b5ed86db75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -82,6 +82,7 @@ public class DatanodeStateMachine implements Closeable { try { LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); nextHB = Time.monotonicNow() + heartbeatFrequency; + context.setReportState(container.getNodeReport()); context.execute(executorService, heartbeatFrequency, TimeUnit.MILLISECONDS); now = Time.monotonicNow(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index e397202276b..4fa8729257e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .RunningDatanodeState; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import java.util.LinkedList; import java.util.Queue; @@ -43,6 +44,7 @@ public class StateContext { private final AtomicLong stateExecutionCount; private final Configuration conf; private DatanodeStateMachine.DatanodeStates state; + private SCMNodeReport nrState; /** * Constructs a StateContext. @@ -59,6 +61,7 @@ public class StateContext { commandQueue = new LinkedList<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); + nrState = SCMNodeReport.getDefaultInstance(); } /** @@ -111,6 +114,22 @@ public class StateContext { this.state = state; } + /** + * Returns the node report of the datanode state context. + * @return the node report. + */ + public SCMNodeReport getNodeReport() { + return nrState; + } + + /** + * Sets the storage location report of the datanode state context. + * @param nrReport - node report + */ + public void setReportState(SCMNodeReport nrReport) { + this.nrState = nrReport; + } + /** * Returns the next task to get executed by the datanode state machine. * @return A callable that will be executed by the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java index 77b4138bd31..347b465be03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -231,6 +231,7 @@ public class RunningDatanodeState implements DatanodeState { .setConfig(conf) .setEndpointStateMachine(endpoint) .setNodeID(getContainerNodeID()) + .setContext(context) .build(); case SHUTDOWN: break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 4f877ff8d04..c0226bbf87c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; import org.slf4j.Logger; @@ -41,6 +42,7 @@ public class HeartbeatEndpointTask private final EndpointStateMachine rpcEndpoint; private final Configuration conf; private ContainerNodeIDProto containerNodeIDProto; + private StateContext context; /** * Constructs a SCM heart beat. @@ -48,9 +50,10 @@ public class HeartbeatEndpointTask * @param conf Config. */ public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint, - Configuration conf) { + Configuration conf, StateContext context) { this.rpcEndpoint = rpcEndpoint; this.conf = conf; + this.context = context; } /** @@ -85,8 +88,9 @@ public class HeartbeatEndpointTask Preconditions.checkState(this.containerNodeIDProto != null); DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this .containerNodeIDProto.getDatanodeID()); - // TODO : Add the command to command processor queue. - rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID); + + rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID, + this.context.getNodeReport()); rpcEndpoint.zeroMissedCount(); } catch (IOException ex) { rpcEndpoint.logIfNeeded(ex @@ -112,6 +116,7 @@ public class HeartbeatEndpointTask private EndpointStateMachine endPointStateMachine; private Configuration conf; private ContainerNodeIDProto containerNodeIDProto; + private StateContext context; /** * Constructs the builder class. @@ -152,6 +157,16 @@ public class HeartbeatEndpointTask return this; } + /** + * Sets the context. + * @param stateContext - State context. + * @return this. + */ + public Builder setContext(StateContext stateContext) { + this.context = stateContext; + return this; + } + public HeartbeatEndpointTask build() { if (endPointStateMachine == null) { LOG.error("No endpoint specified."); @@ -172,10 +187,9 @@ public class HeartbeatEndpointTask } HeartbeatEndpointTask task = new HeartbeatEndpointTask(this - .endPointStateMachine, this.conf); + .endPointStateMachine, this.conf, this.context); task.setContainerNodeIDProto(containerNodeIDProto); return task; } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 0c3cd91f746..0f77175c9c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -29,6 +29,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,6 +140,8 @@ public class OzoneContainer { this.keyManager.shutdown(); this.manager.shutdown(); LOG.info("container services shutdown complete."); + } catch (IOException ex) { + LOG.warn("container service shutdown error:", ex); } finally { this.manager.writeUnlock(); } @@ -155,4 +159,11 @@ public class OzoneContainer { pathList.add(location); } } + + /** + * Returns node report of container storage usage. + */ + public SCMNodeReport getNodeReport() throws IOException { + return this.manager.getNodeReport(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index 6a9dc6723b3..e93baeffc50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import java.io.IOException; @@ -41,11 +42,12 @@ public interface StorageContainerDatanodeProtocol { /** * Used by data node to send a Heartbeat. * @param datanodeID - Datanode ID. + * @param nodeReport - node report state * @return - SCMHeartbeatResponseProto * @throws IOException */ - SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID) - throws IOException; + SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, + SCMNodeReport nodeReport) throws IOException; /** * Register Datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index 6c4fbe3af53..9feec785f0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import java.util.List; @@ -54,8 +55,10 @@ public interface StorageContainerNodeProtocol { /** * Send heartbeat to indicate the datanode is alive and doing well. * @param datanodeID - Datanode ID. + * @param nodeReport - node report. * @return SCMheartbeat response list */ + List sendHeartbeat(DatanodeID datanodeID, + SCMNodeReport nodeReport); - List sendHeartbeat(DatanodeID datanodeID); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index ba40c292b89..477fac5ade6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -27,6 +27,8 @@ import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.proto @@ -113,15 +115,17 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB * Send by datanode to SCM. * * @param datanodeID - DatanodeID + * @param nodeReport - node report * @throws IOException */ @Override - public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID) - throws IOException { - SCMHeartbeatRequestProto.Builder req = - SCMHeartbeatRequestProto.newBuilder(); + public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, + SCMNodeReport nodeReport) throws IOException { + SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto + .newBuilder(); req.setDatanodeID(datanodeID.getProtoBufMessage()); + req.setNodeReport(nodeReport); final SCMHeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 62b885a8f34..995290f7886 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -78,9 +78,9 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB SCMHeartbeatRequestProto request) throws ServiceException { try { return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request - .getDatanodeID())); + .getDatanodeID()), request.getNodeReport()); } catch (IOException e) { throw new ServiceException(e); } } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index b6fd4c0ad90..3f31e98c366 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -44,6 +44,8 @@ import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; import org.apache.hadoop.ozone.protocol.proto @@ -393,9 +395,10 @@ public class StorageContainerManager * @throws IOException */ @Override - public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID) - throws IOException { - List commands = getScmNodeManager().sendHeartbeat(datanodeID); + public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, + SCMNodeReport nodeReport) throws IOException { + List commands = + getScmNodeManager().sendHeartbeat(datanodeID, nodeReport); List cmdReponses = new LinkedList<>(); for (SCMCommand cmd : commands) { cmdReponses.add(getCommandResponse(cmd)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java new file mode 100644 index 00000000000..894455819ac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.scm.node; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + +import static org.apache.hadoop.util.Time.monotonicNow; + +/** + * This class represents the item in SCM heartbeat queue. + */ +public class HeartbeatQueueItem { + private DatanodeID datanodeID; + private long recvTimestamp; + private SCMNodeReport nodeReport; + + /** + * + * @param datanodeID - datanode ID of the heartbeat. + * @param recvTimestamp - heartbeat receive timestamp. + * @param nodeReport - node report associated with the heartbeat if any. + */ + HeartbeatQueueItem(DatanodeID datanodeID, long recvTimestamp, + SCMNodeReport nodeReport) { + this.datanodeID = datanodeID; + this.recvTimestamp = recvTimestamp; + this.nodeReport = nodeReport; + } + + /** + * @return datanode ID. + */ + public DatanodeID getDatanodeID() { + return datanodeID; + } + + /** + * @return node report. + */ + public SCMNodeReport getNodeReport() { + return nodeReport; + } + + /** + * @return heartbeat receive timestamp. + */ + public long getRecvTimestamp() { + return recvTimestamp; + } + + /** + * Builder for HeartbeatQueueItem. + */ + public static class Builder { + private DatanodeID datanodeID; + private SCMNodeReport nodeReport; + private long recvTimestamp = monotonicNow(); + + public Builder setDatanodeID(DatanodeID datanodeId) { + this.datanodeID = datanodeId; + return this; + } + + public Builder setNodeReport(SCMNodeReport scmNodeReport) { + this.nodeReport = scmNodeReport; + return this; + } + + @VisibleForTesting + public Builder setRecvTimestamp(long recvTime) { + this.recvTimestamp = recvTime; + return this; + } + + public HeartbeatQueueItem build() { + return new HeartbeatQueueItem(datanodeID, recvTimestamp, nodeReport); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index 47cb7a058bb..899f50ed284 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -135,4 +135,16 @@ public interface NodeManager extends StorageContainerNodeProtocol, Closeable, DEAD } + /** + * Returns the aggregated node stats. + * @return the aggregated node stats. + */ + SCMNodeStat getStats(); + + /** + * Return a list of node stats. + * @return a list of individual node stats (live/stale but not dead). + */ + List getNodeStats(); + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index 3fe70840eb4..029056fa9e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; @@ -34,8 +35,13 @@ import org.apache.hadoop.ozone.protocol.proto .ErrorCode; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol + .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol + .proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.ozone.scm.VersionInfo; +import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +73,7 @@ import static org.apache.hadoop.util.Time.monotonicNow; *

* Each heartbeat that SCMNodeManager receives is put into heartbeatQueue. The * worker thread wakes up and grabs that heartbeat from the queue. The worker - * thread will lookup the healthynodes map and update the timestamp if the entry + * thread will lookup the healthynodes map and set the timestamp if the entry * is there. if not it will look up stale and deadnodes map. *

* The getNode(byState) functions make copy of node maps and then creates a list @@ -85,14 +91,20 @@ public class SCMNodeManager @VisibleForTesting static final Logger LOG = LoggerFactory.getLogger(SCMNodeManager.class); + /** * Key = NodeID, value = timestamp. */ private final Map healthyNodes; private final Map staleNodes; private final Map deadNodes; - private final Queue heartbeatQueue; + private final Queue heartbeatQueue; private final Map nodes; + // Individual live node stats + private final Map nodeStats; + // Aggregated node stats + private SCMNodeStat scmStat; + // TODO: expose nodeStats and scmStat as metrics private final AtomicInteger healthyNodeCount; private final AtomicInteger staleNodeCount; private final AtomicInteger deadNodeCount; @@ -121,6 +133,8 @@ public class SCMNodeManager deadNodes = new ConcurrentHashMap<>(); staleNodes = new ConcurrentHashMap<>(); nodes = new HashMap<>(); + nodeStats = new HashedMap(); + scmStat = new SCMNodeStat(); healthyNodeCount = new AtomicInteger(0); staleNodeCount = new AtomicInteger(0); @@ -158,7 +172,7 @@ public class SCMNodeManager */ @Override public void removeNode(DatanodeID node) throws UnregisteredNodeException { - // TODO : Fix me. + // TODO : Fix me when adding the SCM CLI. } @@ -371,9 +385,9 @@ public class SCMNodeManager // Process the whole queue. while (!heartbeatQueue.isEmpty() && (lastHBProcessedCount < maxHBToProcessPerLoop)) { - DatanodeID datanodeID = heartbeatQueue.poll(); + HeartbeatQueueItem hbItem = heartbeatQueue.poll(); synchronized (this) { - handleHeartbeat(datanodeID); + handleHeartbeat(hbItem); } // we are shutting down or something give up processing the rest of // HBs. This will terminate the HB processing thread. @@ -439,7 +453,8 @@ public class SCMNodeManager // 4. And the most important reason, heartbeats are not blocked even if // this thread does not run, they will go into the processing queue. - if (!Thread.currentThread().isInterrupted()) { + if (!Thread.currentThread().isInterrupted() && + !executorService.isShutdown()) { executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit .MILLISECONDS); } else { @@ -489,40 +504,85 @@ public class SCMNodeManager staleNodeCount.decrementAndGet(); deadNodes.put(entry.getKey(), entry.getValue()); deadNodeCount.incrementAndGet(); + + // Update SCM node stats + SCMNodeStat deadNodeStat = nodeStats.get(entry.getKey()); + scmStat.subtract(deadNodeStat); + nodeStats.remove(entry.getKey()); } /** * Handles a single heartbeat from a datanode. * - * @param datanodeID - datanode ID. + * @param hbItem - heartbeat item from a datanode. */ - private void handleHeartbeat(DatanodeID datanodeID) { + private void handleHeartbeat(HeartbeatQueueItem hbItem) { lastHBProcessedCount++; + String datanodeID = hbItem.getDatanodeID().getDatanodeUuid(); + SCMNodeReport nodeReport = hbItem.getNodeReport(); + long recvTimestamp = hbItem.getRecvTimestamp(); + long processTimestamp = Time.monotonicNow(); + if (LOG.isTraceEnabled()) { + //TODO: add average queue time of heartbeat request as metrics + LOG.trace("Processing Heartbeat from datanode {}: queueing time {}", + datanodeID, processTimestamp - recvTimestamp); + } + // If this node is already in the list of known and healthy nodes - // just update the last timestamp and return. - if (healthyNodes.containsKey(datanodeID.getDatanodeUuid())) { - healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); + // just set the last timestamp and return. + if (healthyNodes.containsKey(datanodeID)) { + healthyNodes.put(datanodeID, processTimestamp); + updateNodeStat(datanodeID, nodeReport); return; } // A stale node has heartbeat us we need to remove the node from stale // list and move to healthy list. - if (staleNodes.containsKey(datanodeID.getDatanodeUuid())) { - staleNodes.remove(datanodeID.getDatanodeUuid()); - healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); + if (staleNodes.containsKey(datanodeID)) { + staleNodes.remove(datanodeID); + healthyNodes.put(datanodeID, processTimestamp); healthyNodeCount.incrementAndGet(); staleNodeCount.decrementAndGet(); + updateNodeStat(datanodeID, nodeReport); return; } // A dead node has heartbeat us, we need to remove that node from dead // node list and move it to the healthy list. - if (deadNodes.containsKey(datanodeID.getDatanodeUuid())) { - deadNodes.remove(datanodeID.getDatanodeUuid()); - healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); + if (deadNodes.containsKey(datanodeID)) { + deadNodes.remove(datanodeID); + healthyNodes.put(datanodeID, processTimestamp); deadNodeCount.decrementAndGet(); healthyNodeCount.incrementAndGet(); + updateNodeStat(datanodeID, nodeReport); + return; + } + LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeID); + } + + private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) { + SCMNodeStat stat = nodeStats.get(datanodeID); + if (stat == null) { + LOG.debug("SCM updateNodeStat based on heartbeat from previous" + + "dead datanode {}", datanodeID); + stat = new SCMNodeStat(); + } + + if (nodeReport != null && nodeReport.getStorageReportCount() > 0) { + long totalCapacity = 0; + long totalRemaining = 0; + long totalScmUsed = 0; + List storageReports = nodeReport.getStorageReportList(); + for (SCMStorageReport report : storageReports) { + totalCapacity += report.getCapacity(); + totalRemaining += report.getRemaining(); + totalScmUsed += report.getScmUsed(); + } + scmStat.subtract(stat); + stat.set(totalCapacity, totalScmUsed, totalRemaining); + nodeStats.put(datanodeID, stat); + scmStat.add(stat); } } @@ -591,6 +651,7 @@ public class SCMNodeManager totalNodes.incrementAndGet(); healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); healthyNodeCount.incrementAndGet(); + nodeStats.put(datanodeID.getDatanodeUuid(), new SCMNodeStat()); LOG.info("Data node with ID: {} Registered.", datanodeID.getDatanodeUuid()); return RegisteredCommand.newBuilder() @@ -625,23 +686,47 @@ public class SCMNodeManager * Send heartbeat to indicate the datanode is alive and doing well. * * @param datanodeID - Datanode ID. + * @param nodeReport - node report. * @return SCMheartbeat response. * @throws IOException */ @Override - public List sendHeartbeat(DatanodeID datanodeID) { + public List sendHeartbeat(DatanodeID datanodeID, + SCMNodeReport nodeReport) { // Checking for NULL to make sure that we don't get // an exception from ConcurrentList. // This could be a problem in tests, if this function is invoked via // protobuf, transport layer will guarantee that this is not null. if (datanodeID != null) { - heartbeatQueue.add(datanodeID); - + heartbeatQueue.add( + new HeartbeatQueueItem.Builder() + .setDatanodeID(datanodeID) + .setNodeReport(nodeReport) + .build()); } else { LOG.error("Datanode ID in heartbeat is null"); } return commandQueue.getCommand(datanodeID); } -} + + /** + * Returns the aggregated node stats. + * @return the aggregated node stats. + */ + @Override + public SCMNodeStat getStats() { + return new SCMNodeStat(this.scmStat); + } + + /** + * Return a list of node stats. + * @return a list of individual node stats (live/stale but not dead). + */ + @Override + public List getNodeStats(){ + return nodeStats.entrySet().stream().map( + entry -> nodeStats.get(entry.getKey())).collect(Collectors.toList()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java new file mode 100644 index 00000000000..c206807ede5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.scm.node; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class represents the SCM node stat. + */ +public class SCMNodeStat { + private long capacity; + private long scmUsed; + private long remaining; + + public SCMNodeStat() { + } + + public SCMNodeStat(SCMNodeStat other) { + set(other.capacity, other.scmUsed, other.remaining); + } + + /** + * @return the total configured capacity of the node. + */ + public long getCapacity() { + return capacity; + } + + /** + * @return the total SCM used space on the node. + */ + public long getScmUsed() { + return scmUsed; + } + + /** + * @return the total remaining space available on the node. + */ + public long getRemaining() { + return remaining; + } + + @VisibleForTesting + public void set(long total, long used, long remain) { + this.capacity = total; + this.scmUsed = used; + this.remaining = remain; + } + + public SCMNodeStat add(SCMNodeStat stat) { + this.capacity += stat.getCapacity(); + this.scmUsed += stat.getScmUsed(); + this.remaining += stat.getRemaining(); + return this; + } + + public SCMNodeStat subtract(SCMNodeStat stat) { + this.capacity -= stat.getCapacity(); + this.scmUsed -= stat.getScmUsed(); + this.remaining -= stat.getRemaining(); + return this; + } + + @Override + public boolean equals(Object to) { + return this == to || + (to instanceof SCMNodeStat && + capacity == ((SCMNodeStat) to).getCapacity() && + scmUsed == ((SCMNodeStat) to).getScmUsed() && + remaining == ((SCMNodeStat) to).getRemaining()); + } + + @Override + public int hashCode() { + assert false : "hashCode not designed"; + return 42; // any arbitrary constant will do + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index 5dea5cc684d..ee1378f1e89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -47,6 +47,23 @@ import "DatanodeContainerProtocol.proto"; */ message SCMHeartbeatRequestProto { required DatanodeIDProto datanodeID = 1; + optional SCMNodeReport nodeReport= 2; +} + +/** +* This message is send along with the heart beat to report datanode +* storage utilization by SCM. +*/ +message SCMNodeReport { + repeated SCMStorageReport storageReport= 1; +} + +message SCMStorageReport { + required string storageUuid = 1; + optional uint64 capacity = 2 [default = 0]; + optional uint64 scmUsed = 3 [default = 0]; + optional uint64 remaining = 4 [default = 0]; + optional StorageTypeProto storageType = 5 [default = DISK]; } message SCMRegisterRequestProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index 0e30bd9cfd4..b20202e9d21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -22,6 +22,8 @@ import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.NullCommand; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.scm.VersionInfo; import java.io.IOException; @@ -104,12 +106,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { * Used by data node to send a Heartbeat. * * @param datanodeID - Datanode ID. + * @param nodeReport - node report. * @return - SCMHeartbeatResponseProto * @throws IOException */ @Override public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto - sendHeartbeat(DatanodeID datanodeID) + sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport) throws IOException { rpcCount.incrementAndGet(); heartbeatCount.incrementAndGet(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index cde99a146a3..45e35fb937a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -237,7 +237,7 @@ public class TestEndPoint { SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), serverAddress, 1000)) { SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() - .sendHeartbeat(dataNode); + .sendHeartbeat(dataNode, null); Assert.assertNotNull(responseProto); Assert.assertEquals(1, responseProto.getCommandsCount()); Assert.assertNotNull(responseProto.getCommandsList().get(0)); @@ -257,7 +257,7 @@ public class TestEndPoint { .build(); rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT); HeartbeatEndpointTask endpointTask = - new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf()); + new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf(), null); endpointTask.setContainerNodeIDProto(containerNodeID); endpointTask.call(); Assert.assertNotNull(endpointTask.getContainerNodeIDProto()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java index 8cf960d284d..d298d434869 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java @@ -23,7 +23,11 @@ import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.ozone.scm.node.SCMNodeStat; import java.io.IOException; import java.util.LinkedList; @@ -172,6 +176,24 @@ public class MockNodeManager implements NodeManager { return false; } + /** + * Returns the aggregated node stats. + * @return the aggregated node stats. + */ + @Override + public SCMNodeStat getStats() { + return null; + } + + /** + * Return a list of node stats. + * @return a list of individual node stats (live/stale but not dead). + */ + @Override + public List getNodeStats() { + return null; + } + /** * Closes this stream and releases any system resources associated with it. If * the stream is already closed then invoking this method has no effect. @@ -233,10 +255,12 @@ public class MockNodeManager implements NodeManager { * Send heartbeat to indicate the datanode is alive and doing well. * * @param datanodeID - Datanode ID. + * @param nodeReport - node report. * @return SCMheartbeat response list */ @Override - public List sendHeartbeat(DatanodeID datanodeID) { + public List sendHeartbeat(DatanodeID datanodeID, + SCMNodeReport nodeReport) { return null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index 0f028716ac7..355baed66f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -22,6 +22,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.hamcrest.CoreMatchers; import org.junit.Assert; @@ -103,7 +107,7 @@ public class TestNodeManager { // Send some heartbeats from different nodes. for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) { DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); - nodeManager.sendHeartbeat(datanodeID); + nodeManager.sendHeartbeat(datanodeID, null); } // Wait for 4 seconds max. @@ -149,7 +153,7 @@ public class TestNodeManager { // Need 100 nodes to come out of chill mode, only one node is sending HB. nodeManager.setMinimumChillModeNodes(100); - nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager)); + nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager), null); GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, 4 * 1000); assertFalse("Not enough heartbeat, Node manager should have been in " + @@ -175,7 +179,7 @@ public class TestNodeManager { // Send 10 heartbeat from same node, and assert we never leave chill mode. for (int x = 0; x < 10; x++) { - nodeManager.sendHeartbeat(datanodeID); + nodeManager.sendHeartbeat(datanodeID, null); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, @@ -204,7 +208,7 @@ public class TestNodeManager { nodeManager.close(); // These should never be processed. - nodeManager.sendHeartbeat(datanodeID); + nodeManager.sendHeartbeat(datanodeID, null); // Let us just wait for 2 seconds to prove that HBs are not processed. Thread.sleep(2 * 1000); @@ -231,7 +235,7 @@ public class TestNodeManager { for (int x = 0; x < count; x++) { DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); - nodeManager.sendHeartbeat(datanodeID); + nodeManager.sendHeartbeat(datanodeID, null); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, 4 * 1000); @@ -317,14 +321,18 @@ public class TestNodeManager { DatanodeID staleNode = SCMTestUtils.getDatanodeID(nodeManager); // Heartbeat once - nodeManager.sendHeartbeat(staleNode); + nodeManager.sendHeartbeat(staleNode, null); // Heartbeat all other nodes. - nodeList.forEach(nodeManager::sendHeartbeat); + for (DatanodeID dn : nodeList) { + nodeManager.sendHeartbeat(dn, null); + } // Wait for 2 seconds .. and heartbeat good nodes again. Thread.sleep(2 * 1000); - nodeList.forEach(nodeManager::sendHeartbeat); + for (DatanodeID dn : nodeList) { + nodeManager.sendHeartbeat(dn, null); + } // Wait for 2 more seconds, 3 seconds is the stale window for this test Thread.sleep(2 * 1000); @@ -367,19 +375,25 @@ public class TestNodeManager { DatanodeID deadNode = SCMTestUtils.getDatanodeID(nodeManager); // Heartbeat once - nodeManager.sendHeartbeat(deadNode); + nodeManager.sendHeartbeat(deadNode, null); // Heartbeat all other nodes. - nodeList.forEach(nodeManager::sendHeartbeat); + for (DatanodeID dn : nodeList) { + nodeManager.sendHeartbeat(dn, null); + } // Wait for 2 seconds .. and heartbeat good nodes again. Thread.sleep(2 * 1000); - nodeList.forEach(nodeManager::sendHeartbeat); + for (DatanodeID dn : nodeList) { + nodeManager.sendHeartbeat(dn, null); + } Thread.sleep(3 * 1000); // heartbeat good nodes again. - nodeList.forEach(nodeManager::sendHeartbeat); + for (DatanodeID dn : nodeList) { + nodeManager.sendHeartbeat(dn, null); + } // 6 seconds is the dead window for this test , so we wait a total of // 7 seconds to make sure that the node moves into dead state. @@ -408,7 +422,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(getConf())) { GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); - nodeManager.sendHeartbeat(null); + nodeManager.sendHeartbeat(null, null); logCapturer.stopCapturing(); assertThat(logCapturer.getOutput(), containsString("Datanode ID in " + "heartbeat is null")); @@ -484,9 +498,9 @@ public class TestNodeManager { SCMTestUtils.getDatanodeID(nodeManager, "StaleNode"); DatanodeID deadNode = SCMTestUtils.getDatanodeID(nodeManager, "DeadNode"); - nodeManager.sendHeartbeat(healthyNode); - nodeManager.sendHeartbeat(staleNode); - nodeManager.sendHeartbeat(deadNode); + nodeManager.sendHeartbeat(healthyNode, null); + nodeManager.sendHeartbeat(staleNode, null); + nodeManager.sendHeartbeat(deadNode, null); // Sleep so that heartbeat processing thread gets to run. Thread.sleep(500); @@ -512,12 +526,12 @@ public class TestNodeManager { * the 3 second windows. */ - nodeManager.sendHeartbeat(healthyNode); - nodeManager.sendHeartbeat(staleNode); - nodeManager.sendHeartbeat(deadNode); + nodeManager.sendHeartbeat(healthyNode, null); + nodeManager.sendHeartbeat(staleNode, null); + nodeManager.sendHeartbeat(deadNode, null); Thread.sleep(1500); - nodeManager.sendHeartbeat(healthyNode); + nodeManager.sendHeartbeat(healthyNode, null); Thread.sleep(2 * 1000); assertEquals(1, nodeManager.getNodeCount(HEALTHY)); @@ -537,10 +551,10 @@ public class TestNodeManager { * staleNode to move to stale state and deadNode to move to dead state. */ - nodeManager.sendHeartbeat(healthyNode); - nodeManager.sendHeartbeat(staleNode); + nodeManager.sendHeartbeat(healthyNode, null); + nodeManager.sendHeartbeat(staleNode, null); Thread.sleep(1500); - nodeManager.sendHeartbeat(healthyNode); + nodeManager.sendHeartbeat(healthyNode, null); Thread.sleep(2 * 1000); // 3.5 seconds have elapsed for stale node, so it moves into Stale. @@ -570,9 +584,9 @@ public class TestNodeManager { * Cluster State : let us heartbeat all the nodes and verify that we get * back all the nodes in healthy state. */ - nodeManager.sendHeartbeat(healthyNode); - nodeManager.sendHeartbeat(staleNode); - nodeManager.sendHeartbeat(deadNode); + nodeManager.sendHeartbeat(healthyNode, null); + nodeManager.sendHeartbeat(staleNode, null); + nodeManager.sendHeartbeat(deadNode, null); Thread.sleep(500); //Assert all nodes are healthy. assertEquals(3, nodeManager.getAllNodes().size()); @@ -591,8 +605,9 @@ public class TestNodeManager { private void heartbeatNodeSet(SCMNodeManager manager, List list, int sleepDuration) throws InterruptedException { while (!Thread.currentThread().isInterrupted()) { - list.forEach(manager::sendHeartbeat); - Thread.sleep(sleepDuration); + for (DatanodeID dn : list) { + manager.sendHeartbeat(dn, null); + } Thread.sleep(sleepDuration); } } @@ -676,7 +691,10 @@ public class TestNodeManager { // No Thread just one time HBs the node manager, so that these will be // marked as dead nodes eventually. - deadNodeList.forEach(nodeManager::sendHeartbeat); + for (DatanodeID dn : deadNodeList) { + nodeManager.sendHeartbeat(dn, null); + } + Thread thread1 = new Thread(healthyNodeTask); thread1.setDaemon(true); @@ -828,7 +846,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(conf)) { nodeManager.setMinimumChillModeNodes(10); DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); - nodeManager.sendHeartbeat(datanodeID); + nodeManager.sendHeartbeat(datanodeID, null); String status = nodeManager.getChillModeStatus(); Assert.assertThat(status, CoreMatchers.containsString("Still in chill " + "mode. Waiting on nodes to report in.")); @@ -858,7 +876,7 @@ public class TestNodeManager { // Assert that node manager force enter cannot be overridden by nodes HBs. for(int x= 0; x < 20; x++) { DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager); - nodeManager.sendHeartbeat(datanode); + nodeManager.sendHeartbeat(datanode, null); } Thread.sleep(500); @@ -873,6 +891,147 @@ public class TestNodeManager { CoreMatchers.containsString("Out of chill mode.")); assertFalse(nodeManager.isInManualChillMode()); } + } + /** + * Test multiple nodes sending initial heartbeat with their node report. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmStatsFromNodeReport() throws IOException, + InterruptedException, TimeoutException { + Configuration conf = getConf(); + final int nodeCount = 10; + final long capacity = 2000; + final long used = 100; + final long remaining = capacity - used; + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + for (int x = 0; x < nodeCount; x++) { + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); + + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + srb.setStorageUuid(UUID.randomUUID().toString()); + srb.setCapacity(capacity).setScmUsed(used). + setRemaining(capacity - used).build(); + nodeManager.sendHeartbeat(datanodeID, + nrb.addStorageReport(srb).build()); + } + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, + 4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); + assertEquals(capacity * nodeCount, + nodeManager.getStats().getCapacity()); + assertEquals(used * nodeCount, + nodeManager.getStats().getScmUsed()); + assertEquals(remaining * nodeCount, + nodeManager.getStats().getRemaining()); + } + } + + /** + * Test single node stat update based on nodereport from different heartbeat + * status (healthy, stale and dead). + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmNodeReportUpdate() throws IOException, + InterruptedException, TimeoutException { + Configuration conf = getConf(); + final int heartbeatCount = 5; + final int nodeCount = 1; + final int interval = 100; + + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); + conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); + conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); + conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); + final long capacity = 2000; + final long usedPerHeartbeat = 100; + + for (int x = 0; x < heartbeatCount; x++) { + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + srb.setStorageUuid(UUID.randomUUID().toString()); + srb.setCapacity(capacity).setScmUsed(x * usedPerHeartbeat) + .setRemaining(capacity - x * usedPerHeartbeat).build(); + nrb.addStorageReport(srb); + + nodeManager.sendHeartbeat(datanodeID, nrb.build()); + Thread.sleep(100); + } + + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, + 4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); + + final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount -1); + final long expectedRemaining = capacity - + usedPerHeartbeat * (heartbeatCount - 1); + assertEquals(capacity, nodeManager.getStats().getCapacity()); + assertEquals(expectedScmUsed, nodeManager.getStats().getScmUsed()); + assertEquals(expectedRemaining, nodeManager.getStats().getRemaining()); + + // Test NodeManager#getNodeStats + assertEquals(nodeCount, nodeManager.getNodeStats().size()); + assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity()); + assertEquals(expectedScmUsed, + nodeManager.getNodeStats().get(0).getScmUsed()); + assertEquals(expectedRemaining, + nodeManager.getNodeStats().get(0).getRemaining()); + + // Wait up to 4s so that the node becomes stale + // Verify the usage info should be unchanged. + GenericTestUtils.waitFor( + () -> nodeManager.getNodeCount(NodeManager.NODESTATE.STALE) == 1, 100, + 4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeStats().size()); + assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity()); + assertEquals(expectedScmUsed, + nodeManager.getNodeStats().get(0).getScmUsed()); + assertEquals(expectedRemaining, + nodeManager.getNodeStats().get(0).getRemaining()); + + // Wait up to 3 more seconds so the node becomes dead + // Verify usage info should be updated. + GenericTestUtils.waitFor( + () -> nodeManager.getNodeCount(NodeManager.NODESTATE.DEAD) == 1, 100, + 3 * 1000); + + assertEquals(0, nodeManager.getNodeStats().size()); + assertEquals(0, nodeManager.getStats().getCapacity()); + assertEquals(0, nodeManager.getStats().getScmUsed()); + assertEquals(0, nodeManager.getStats().getRemaining()); + + // Send a new report to bring the dead node back to healty + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + srb.setStorageUuid(UUID.randomUUID().toString()); + srb.setCapacity(capacity).setScmUsed(expectedScmUsed) + .setRemaining(expectedRemaining).build(); + nrb.addStorageReport(srb); + nodeManager.sendHeartbeat(datanodeID, nrb.build()); + + // Wait up to 5 seconds so that the dead node becomes healthy + // Verify usage info should be updated. + GenericTestUtils.waitFor( + () -> nodeManager.getNodeCount(NodeManager.NODESTATE.HEALTHY) == 1, + 100, 5 * 1000); + assertEquals(nodeCount, nodeManager.getNodeStats().size()); + assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity()); + assertEquals(expectedScmUsed, + nodeManager.getNodeStats().get(0).getScmUsed()); + assertEquals(expectedRemaining, + nodeManager.getNodeStats().get(0).getRemaining()); + } } }