diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index 88bdebc2af5..2beb5251c64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -24,6 +24,10 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -120,7 +124,7 @@ public class ContainerManagerImpl implements ContainerManager {
dataDirs.add(location);
}
this.locationManager =
- new ContainerLocationManagerImpl(containerDirs, dataDirs);
+ new ContainerLocationManagerImpl(containerDirs, dataDirs, config);
} finally {
readUnlock();
@@ -395,9 +399,10 @@ public class ContainerManagerImpl implements ContainerManager {
* @throws IOException
*/
@Override
- public void shutdown() {
+ public void shutdown() throws IOException {
Preconditions.checkState(this.hasWriteLock());
this.containerMap.clear();
+ this.locationManager.shutdown();
}
@@ -497,6 +502,25 @@ public class ContainerManagerImpl implements ContainerManager {
return this.keyManager;
}
+ /**
+ * Get the node report.
+ * @return node report.
+ */
+ @Override
+ public SCMNodeReport getNodeReport() throws IOException {
+ StorageLocationReport[] reports = locationManager.getLocationReport();
+ SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+ for (int i = 0; i < reports.length; i++) {
+ SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+ nrb.addStorageReport(i, srb.setStorageUuid(reports[i].getId())
+ .setCapacity(reports[i].getCapacity())
+ .setScmUsed(reports[i].getScmUsed())
+ .setRemaining(reports[i].getRemaining())
+ .build());
+ }
+ return nrb.build();
+ }
+
/**
* Filter out only container files from the container metadata dir.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
new file mode 100644
index 00000000000..14dcc3d229a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CachingGetSpaceUsed;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Scanner;
+
+import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
+
+/**
+ * Class that wraps the space usage of the Datanode Container Storage Location
+ * by SCM containers.
+ */
+public class ContainerStorageLocation {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerStorageLocation.class);
+
+ private static final String DU_CACHE_FILE = "scmUsed";
+ private volatile boolean scmUsedSaved = false;
+
+ private final StorageLocation dataLocation;
+ private final String storageUuId;
+ private final DF usage;
+ private final GetSpaceUsed scmUsage;
+ private final File scmUsedFile;
+
+ public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf)
+ throws IOException {
+ this.dataLocation = dataLoc;
+ this.storageUuId = DatanodeStorage.generateUuid();
+ File dataDir = new File(dataLoc.getNormalizedUri().getPath());
+ scmUsedFile = new File(dataDir, DU_CACHE_FILE);
+ // get overall disk usage
+ this.usage = new DF(dataDir, conf);
+ // get SCM specific usage
+ this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir)
+ .setConf(conf)
+ .setInitialUsed(loadScmUsed())
+ .build();
+
+ // Ensure scm usage is saved during shutdown.
+ ShutdownHookManager.get().addShutdownHook(
+ new Runnable() {
+ @Override
+ public void run() {
+ if (!scmUsedSaved) {
+ saveScmUsed();
+ }
+ }
+ }, SHUTDOWN_HOOK_PRIORITY);
+ }
+
+ public URI getNormalizedUri() {
+ return dataLocation.getNormalizedUri();
+ }
+
+ public String getStorageUuId() {
+ return storageUuId;
+ }
+ public long getCapacity() {
+ long capacity = usage.getCapacity();
+ return (capacity > 0) ? capacity : 0;
+ }
+
+ public long getAvailable() throws IOException {
+ long remaining = getCapacity() - getScmUsed();
+ long available = usage.getAvailable();
+ if (remaining > available) {
+ remaining = available;
+ }
+ return (remaining > 0) ? remaining : 0;
+ }
+
+ public long getScmUsed() throws IOException{
+ return scmUsage.getUsed();
+ }
+
+ public void shutdown() {
+ saveScmUsed();
+ scmUsedSaved = true;
+
+ if (scmUsage instanceof CachingGetSpaceUsed) {
+ IOUtils.cleanup(null, ((CachingGetSpaceUsed) scmUsage));
+ }
+ }
+
+ /**
+ * Read in the cached DU value and return it if it is less than 600 seconds
+ * old (DU update interval). Slight imprecision of scmUsed is not critical
+ * and skipping DU can significantly shorten the startup time.
+ * If the cached value is not available or too old, -1 is returned.
+ */
+ long loadScmUsed() {
+ long cachedScmUsed;
+ long mtime;
+ Scanner sc;
+
+ try {
+ sc = new Scanner(scmUsedFile, "UTF-8");
+ } catch (FileNotFoundException fnfe) {
+ return -1;
+ }
+
+ try {
+ // Get the recorded scmUsed from the file.
+ if (sc.hasNextLong()) {
+ cachedScmUsed = sc.nextLong();
+ } else {
+ return -1;
+ }
+ // Get the recorded mtime from the file.
+ if (sc.hasNextLong()) {
+ mtime = sc.nextLong();
+ } else {
+ return -1;
+ }
+
+ // Return the cached value if mtime is okay.
+ if (mtime > 0 && (Time.now() - mtime < 600000L)) {
+ LOG.info("Cached ScmUsed found for {} : {} ", dataLocation,
+ cachedScmUsed);
+ return cachedScmUsed;
+ }
+ return -1;
+ } finally {
+ sc.close();
+ }
+ }
+
+ /**
+ * Write the current scmUsed to the cache file.
+ */
+ void saveScmUsed() {
+ if (scmUsedFile.exists() && !scmUsedFile.delete()) {
+ LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation);
+ }
+ OutputStreamWriter out = null;
+ try {
+ long used = getScmUsed();
+ if (used > 0) {
+ out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
+ StandardCharsets.UTF_8);
+ // mtime is written last, so that truncated writes won't be valid.
+ out.write(Long.toString(used) + " " + Long.toString(Time.now()));
+ out.flush();
+ out.close();
+ out = null;
+ }
+ } catch (IOException ioe) {
+ // If write failed, the volume might be bad. Since the cache file is
+ // not critical, log the error and continue.
+ LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
+ } finally {
+ IOUtils.cleanup(null, out);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
new file mode 100644
index 00000000000..7ef91a91f78
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+/**
+ * Storage location stats of datanodes that provide back store for containers.
+ *
+ */
+public class StorageLocationReport {
+ public static final StorageLocationReport[] EMPTY_ARRAY = {};
+
+ private final String id;
+ private final boolean failed;
+ private final long capacity;
+ private final long scmUsed;
+ private final long remaining;
+
+ public StorageLocationReport(String id, boolean failed,
+ long capacity, long scmUsed, long remaining) {
+ this.id = id;
+ this.failed = failed;
+ this.capacity = capacity;
+ this.scmUsed = scmUsed;
+ this.remaining = remaining;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+
+ public long getCapacity() {
+ return capacity;
+ }
+
+ public long getScmUsed() {
+ return scmUsed;
+ }
+
+ public long getRemaining() {
+ return remaining;
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java
index 00e7223d212..9c5fcea1639 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.container.common.interfaces;
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
+
import java.io.IOException;
import java.nio.file.Path;
@@ -41,4 +43,16 @@ public interface ContainerLocationManager {
*/
Path getDataPath(String containerName) throws IOException;
+ /**
+ * Returns an array of storage location usage report.
+ * @return storage location usage report.
+ */
+ StorageLocationReport[] getLocationReport() throws IOException;
+
+ /**
+ * Supports clean shutdown of container.
+ *
+ * @throws IOException
+ */
+ void shutdown() throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
index 71bc6049040..b063085297a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException;
@@ -93,7 +95,7 @@ public interface ContainerManager extends RwLock {
*
* @throws IOException
*/
- void shutdown();
+ void shutdown() throws IOException;
/**
* Sets the Chunk Manager.
@@ -123,4 +125,9 @@ public interface ContainerManager extends RwLock {
*/
KeyManager getKeyManager();
+ /**
+ * Get the Node Report of container storage usage.
+ * @return node report.
+ */
+ SCMNodeReport getNodeReport() throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java
new file mode 100644
index 00000000000..d83bf95c362
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+/**
+ This package contains common ozone container interfaces.
+ */
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index db837344023..5b5ed86db75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -82,6 +82,7 @@ public class DatanodeStateMachine implements Closeable {
try {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
nextHB = Time.monotonicNow() + heartbeatFrequency;
+ context.setReportState(container.getNodeReport());
context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS);
now = Time.monotonicNow();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index e397202276b..4fa8729257e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import java.util.LinkedList;
import java.util.Queue;
@@ -43,6 +44,7 @@ public class StateContext {
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private DatanodeStateMachine.DatanodeStates state;
+ private SCMNodeReport nrState;
/**
* Constructs a StateContext.
@@ -59,6 +61,7 @@ public class StateContext {
commandQueue = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
+ nrState = SCMNodeReport.getDefaultInstance();
}
/**
@@ -111,6 +114,22 @@ public class StateContext {
this.state = state;
}
+ /**
+ * Returns the node report of the datanode state context.
+ * @return the node report.
+ */
+ public SCMNodeReport getNodeReport() {
+ return nrState;
+ }
+
+ /**
+ * Sets the storage location report of the datanode state context.
+ * @param nrReport - node report
+ */
+ public void setReportState(SCMNodeReport nrReport) {
+ this.nrState = nrReport;
+ }
+
/**
* Returns the next task to get executed by the datanode state machine.
* @return A callable that will be executed by the
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index 77b4138bd31..347b465be03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -231,6 +231,7 @@ public class RunningDatanodeState implements DatanodeState {
.setConfig(conf)
.setEndpointStateMachine(endpoint)
.setNodeID(getContainerNodeID())
+ .setContext(context)
.build();
case SHUTDOWN:
break;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 4f877ff8d04..c0226bbf87c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.slf4j.Logger;
@@ -41,6 +42,7 @@ public class HeartbeatEndpointTask
private final EndpointStateMachine rpcEndpoint;
private final Configuration conf;
private ContainerNodeIDProto containerNodeIDProto;
+ private StateContext context;
/**
* Constructs a SCM heart beat.
@@ -48,9 +50,10 @@ public class HeartbeatEndpointTask
* @param conf Config.
*/
public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
- Configuration conf) {
+ Configuration conf, StateContext context) {
this.rpcEndpoint = rpcEndpoint;
this.conf = conf;
+ this.context = context;
}
/**
@@ -85,8 +88,9 @@ public class HeartbeatEndpointTask
Preconditions.checkState(this.containerNodeIDProto != null);
DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this
.containerNodeIDProto.getDatanodeID());
- // TODO : Add the command to command processor queue.
- rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID);
+
+ rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID,
+ this.context.getNodeReport());
rpcEndpoint.zeroMissedCount();
} catch (IOException ex) {
rpcEndpoint.logIfNeeded(ex
@@ -112,6 +116,7 @@ public class HeartbeatEndpointTask
private EndpointStateMachine endPointStateMachine;
private Configuration conf;
private ContainerNodeIDProto containerNodeIDProto;
+ private StateContext context;
/**
* Constructs the builder class.
@@ -152,6 +157,16 @@ public class HeartbeatEndpointTask
return this;
}
+ /**
+ * Sets the context.
+ * @param stateContext - State context.
+ * @return this.
+ */
+ public Builder setContext(StateContext stateContext) {
+ this.context = stateContext;
+ return this;
+ }
+
public HeartbeatEndpointTask build() {
if (endPointStateMachine == null) {
LOG.error("No endpoint specified.");
@@ -172,10 +187,9 @@ public class HeartbeatEndpointTask
}
HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
- .endPointStateMachine, this.conf);
+ .endPointStateMachine, this.conf, this.context);
task.setContainerNodeIDProto(containerNodeIDProto);
return task;
}
-
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 0c3cd91f746..0f77175c9c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,6 +140,8 @@ public class OzoneContainer {
this.keyManager.shutdown();
this.manager.shutdown();
LOG.info("container services shutdown complete.");
+ } catch (IOException ex) {
+ LOG.warn("container service shutdown error:", ex);
} finally {
this.manager.writeUnlock();
}
@@ -155,4 +159,11 @@ public class OzoneContainer {
pathList.add(location);
}
}
+
+ /**
+ * Returns node report of container storage usage.
+ */
+ public SCMNodeReport getNodeReport() throws IOException {
+ return this.manager.getNodeReport();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index 6a9dc6723b3..e93baeffc50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import java.io.IOException;
@@ -41,11 +42,12 @@ public interface StorageContainerDatanodeProtocol {
/**
* Used by data node to send a Heartbeat.
* @param datanodeID - Datanode ID.
+ * @param nodeReport - node report state
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
- SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
- throws IOException;
+ SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
+ SCMNodeReport nodeReport) throws IOException;
/**
* Register Datanode.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index 6c4fbe3af53..9feec785f0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import java.util.List;
@@ -54,8 +55,10 @@ public interface StorageContainerNodeProtocol {
/**
* Send heartbeat to indicate the datanode is alive and doing well.
* @param datanodeID - Datanode ID.
+ * @param nodeReport - node report.
* @return SCMheartbeat response list
*/
+ List sendHeartbeat(DatanodeID datanodeID,
+ SCMNodeReport nodeReport);
- List sendHeartbeat(DatanodeID datanodeID);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index ba40c292b89..477fac5ade6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto
@@ -113,15 +115,17 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
* Send by datanode to SCM.
*
* @param datanodeID - DatanodeID
+ * @param nodeReport - node report
* @throws IOException
*/
@Override
- public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
- throws IOException {
- SCMHeartbeatRequestProto.Builder req =
- SCMHeartbeatRequestProto.newBuilder();
+ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
+ SCMNodeReport nodeReport) throws IOException {
+ SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
+ .newBuilder();
req.setDatanodeID(datanodeID.getProtoBufMessage());
+ req.setNodeReport(nodeReport);
final SCMHeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 62b885a8f34..995290f7886 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -78,9 +78,9 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
SCMHeartbeatRequestProto request) throws ServiceException {
try {
return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request
- .getDatanodeID()));
+ .getDatanodeID()), request.getNodeReport());
} catch (IOException e) {
throw new ServiceException(e);
}
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index b6fd4c0ad90..3f31e98c366 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto
@@ -393,9 +395,10 @@ public class StorageContainerManager
* @throws IOException
*/
@Override
- public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
- throws IOException {
- List commands = getScmNodeManager().sendHeartbeat(datanodeID);
+ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
+ SCMNodeReport nodeReport) throws IOException {
+ List commands =
+ getScmNodeManager().sendHeartbeat(datanodeID, nodeReport);
List cmdReponses = new LinkedList<>();
for (SCMCommand cmd : commands) {
cmdReponses.add(getCommandResponse(cmd));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java
new file mode 100644
index 00000000000..894455819ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+/**
+ * This class represents the item in SCM heartbeat queue.
+ */
+public class HeartbeatQueueItem {
+ private DatanodeID datanodeID;
+ private long recvTimestamp;
+ private SCMNodeReport nodeReport;
+
+ /**
+ *
+ * @param datanodeID - datanode ID of the heartbeat.
+ * @param recvTimestamp - heartbeat receive timestamp.
+ * @param nodeReport - node report associated with the heartbeat if any.
+ */
+ HeartbeatQueueItem(DatanodeID datanodeID, long recvTimestamp,
+ SCMNodeReport nodeReport) {
+ this.datanodeID = datanodeID;
+ this.recvTimestamp = recvTimestamp;
+ this.nodeReport = nodeReport;
+ }
+
+ /**
+ * @return datanode ID.
+ */
+ public DatanodeID getDatanodeID() {
+ return datanodeID;
+ }
+
+ /**
+ * @return node report.
+ */
+ public SCMNodeReport getNodeReport() {
+ return nodeReport;
+ }
+
+ /**
+ * @return heartbeat receive timestamp.
+ */
+ public long getRecvTimestamp() {
+ return recvTimestamp;
+ }
+
+ /**
+ * Builder for HeartbeatQueueItem.
+ */
+ public static class Builder {
+ private DatanodeID datanodeID;
+ private SCMNodeReport nodeReport;
+ private long recvTimestamp = monotonicNow();
+
+ public Builder setDatanodeID(DatanodeID datanodeId) {
+ this.datanodeID = datanodeId;
+ return this;
+ }
+
+ public Builder setNodeReport(SCMNodeReport scmNodeReport) {
+ this.nodeReport = scmNodeReport;
+ return this;
+ }
+
+ @VisibleForTesting
+ public Builder setRecvTimestamp(long recvTime) {
+ this.recvTimestamp = recvTime;
+ return this;
+ }
+
+ public HeartbeatQueueItem build() {
+ return new HeartbeatQueueItem(datanodeID, recvTimestamp, nodeReport);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
index 47cb7a058bb..899f50ed284 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
@@ -135,4 +135,16 @@ public interface NodeManager extends StorageContainerNodeProtocol, Closeable,
DEAD
}
+ /**
+ * Returns the aggregated node stats.
+ * @return the aggregated node stats.
+ */
+ SCMNodeStat getStats();
+
+ /**
+ * Return a list of node stats.
+ * @return a list of individual node stats (live/stale but not dead).
+ */
+ List getNodeStats();
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
index 3fe70840eb4..029056fa9e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@@ -34,8 +35,13 @@ import org.apache.hadoop.ozone.protocol.proto
.ErrorCode;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol
+ .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol
+ .proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.scm.VersionInfo;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +73,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
*
* Each heartbeat that SCMNodeManager receives is put into heartbeatQueue. The
* worker thread wakes up and grabs that heartbeat from the queue. The worker
- * thread will lookup the healthynodes map and update the timestamp if the entry
+ * thread will lookup the healthynodes map and set the timestamp if the entry
* is there. if not it will look up stale and deadnodes map.
*
* The getNode(byState) functions make copy of node maps and then creates a list
@@ -85,14 +91,20 @@ public class SCMNodeManager
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMNodeManager.class);
+
/**
* Key = NodeID, value = timestamp.
*/
private final Map healthyNodes;
private final Map staleNodes;
private final Map deadNodes;
- private final Queue heartbeatQueue;
+ private final Queue heartbeatQueue;
private final Map nodes;
+ // Individual live node stats
+ private final Map nodeStats;
+ // Aggregated node stats
+ private SCMNodeStat scmStat;
+ // TODO: expose nodeStats and scmStat as metrics
private final AtomicInteger healthyNodeCount;
private final AtomicInteger staleNodeCount;
private final AtomicInteger deadNodeCount;
@@ -121,6 +133,8 @@ public class SCMNodeManager
deadNodes = new ConcurrentHashMap<>();
staleNodes = new ConcurrentHashMap<>();
nodes = new HashMap<>();
+ nodeStats = new HashedMap();
+ scmStat = new SCMNodeStat();
healthyNodeCount = new AtomicInteger(0);
staleNodeCount = new AtomicInteger(0);
@@ -158,7 +172,7 @@ public class SCMNodeManager
*/
@Override
public void removeNode(DatanodeID node) throws UnregisteredNodeException {
- // TODO : Fix me.
+ // TODO : Fix me when adding the SCM CLI.
}
@@ -371,9 +385,9 @@ public class SCMNodeManager
// Process the whole queue.
while (!heartbeatQueue.isEmpty() &&
(lastHBProcessedCount < maxHBToProcessPerLoop)) {
- DatanodeID datanodeID = heartbeatQueue.poll();
+ HeartbeatQueueItem hbItem = heartbeatQueue.poll();
synchronized (this) {
- handleHeartbeat(datanodeID);
+ handleHeartbeat(hbItem);
}
// we are shutting down or something give up processing the rest of
// HBs. This will terminate the HB processing thread.
@@ -439,7 +453,8 @@ public class SCMNodeManager
// 4. And the most important reason, heartbeats are not blocked even if
// this thread does not run, they will go into the processing queue.
- if (!Thread.currentThread().isInterrupted()) {
+ if (!Thread.currentThread().isInterrupted() &&
+ !executorService.isShutdown()) {
executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit
.MILLISECONDS);
} else {
@@ -489,40 +504,85 @@ public class SCMNodeManager
staleNodeCount.decrementAndGet();
deadNodes.put(entry.getKey(), entry.getValue());
deadNodeCount.incrementAndGet();
+
+ // Update SCM node stats
+ SCMNodeStat deadNodeStat = nodeStats.get(entry.getKey());
+ scmStat.subtract(deadNodeStat);
+ nodeStats.remove(entry.getKey());
}
/**
* Handles a single heartbeat from a datanode.
*
- * @param datanodeID - datanode ID.
+ * @param hbItem - heartbeat item from a datanode.
*/
- private void handleHeartbeat(DatanodeID datanodeID) {
+ private void handleHeartbeat(HeartbeatQueueItem hbItem) {
lastHBProcessedCount++;
+ String datanodeID = hbItem.getDatanodeID().getDatanodeUuid();
+ SCMNodeReport nodeReport = hbItem.getNodeReport();
+ long recvTimestamp = hbItem.getRecvTimestamp();
+ long processTimestamp = Time.monotonicNow();
+ if (LOG.isTraceEnabled()) {
+ //TODO: add average queue time of heartbeat request as metrics
+ LOG.trace("Processing Heartbeat from datanode {}: queueing time {}",
+ datanodeID, processTimestamp - recvTimestamp);
+ }
+
// If this node is already in the list of known and healthy nodes
- // just update the last timestamp and return.
- if (healthyNodes.containsKey(datanodeID.getDatanodeUuid())) {
- healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
+ // just set the last timestamp and return.
+ if (healthyNodes.containsKey(datanodeID)) {
+ healthyNodes.put(datanodeID, processTimestamp);
+ updateNodeStat(datanodeID, nodeReport);
return;
}
// A stale node has heartbeat us we need to remove the node from stale
// list and move to healthy list.
- if (staleNodes.containsKey(datanodeID.getDatanodeUuid())) {
- staleNodes.remove(datanodeID.getDatanodeUuid());
- healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
+ if (staleNodes.containsKey(datanodeID)) {
+ staleNodes.remove(datanodeID);
+ healthyNodes.put(datanodeID, processTimestamp);
healthyNodeCount.incrementAndGet();
staleNodeCount.decrementAndGet();
+ updateNodeStat(datanodeID, nodeReport);
return;
}
// A dead node has heartbeat us, we need to remove that node from dead
// node list and move it to the healthy list.
- if (deadNodes.containsKey(datanodeID.getDatanodeUuid())) {
- deadNodes.remove(datanodeID.getDatanodeUuid());
- healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
+ if (deadNodes.containsKey(datanodeID)) {
+ deadNodes.remove(datanodeID);
+ healthyNodes.put(datanodeID, processTimestamp);
deadNodeCount.decrementAndGet();
healthyNodeCount.incrementAndGet();
+ updateNodeStat(datanodeID, nodeReport);
+ return;
+ }
+ LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeID);
+ }
+
+ private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) {
+ SCMNodeStat stat = nodeStats.get(datanodeID);
+ if (stat == null) {
+ LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
+ "dead datanode {}", datanodeID);
+ stat = new SCMNodeStat();
+ }
+
+ if (nodeReport != null && nodeReport.getStorageReportCount() > 0) {
+ long totalCapacity = 0;
+ long totalRemaining = 0;
+ long totalScmUsed = 0;
+ List storageReports = nodeReport.getStorageReportList();
+ for (SCMStorageReport report : storageReports) {
+ totalCapacity += report.getCapacity();
+ totalRemaining += report.getRemaining();
+ totalScmUsed += report.getScmUsed();
+ }
+ scmStat.subtract(stat);
+ stat.set(totalCapacity, totalScmUsed, totalRemaining);
+ nodeStats.put(datanodeID, stat);
+ scmStat.add(stat);
}
}
@@ -591,6 +651,7 @@ public class SCMNodeManager
totalNodes.incrementAndGet();
healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
healthyNodeCount.incrementAndGet();
+ nodeStats.put(datanodeID.getDatanodeUuid(), new SCMNodeStat());
LOG.info("Data node with ID: {} Registered.",
datanodeID.getDatanodeUuid());
return RegisteredCommand.newBuilder()
@@ -625,23 +686,47 @@ public class SCMNodeManager
* Send heartbeat to indicate the datanode is alive and doing well.
*
* @param datanodeID - Datanode ID.
+ * @param nodeReport - node report.
* @return SCMheartbeat response.
* @throws IOException
*/
@Override
- public List sendHeartbeat(DatanodeID datanodeID) {
+ public List sendHeartbeat(DatanodeID datanodeID,
+ SCMNodeReport nodeReport) {
// Checking for NULL to make sure that we don't get
// an exception from ConcurrentList.
// This could be a problem in tests, if this function is invoked via
// protobuf, transport layer will guarantee that this is not null.
if (datanodeID != null) {
- heartbeatQueue.add(datanodeID);
-
+ heartbeatQueue.add(
+ new HeartbeatQueueItem.Builder()
+ .setDatanodeID(datanodeID)
+ .setNodeReport(nodeReport)
+ .build());
} else {
LOG.error("Datanode ID in heartbeat is null");
}
return commandQueue.getCommand(datanodeID);
}
-}
+
+ /**
+ * Returns the aggregated node stats.
+ * @return the aggregated node stats.
+ */
+ @Override
+ public SCMNodeStat getStats() {
+ return new SCMNodeStat(this.scmStat);
+ }
+
+ /**
+ * Return a list of node stats.
+ * @return a list of individual node stats (live/stale but not dead).
+ */
+ @Override
+ public List getNodeStats(){
+ return nodeStats.entrySet().stream().map(
+ entry -> nodeStats.get(entry.getKey())).collect(Collectors.toList());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java
new file mode 100644
index 00000000000..c206807ede5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class represents the SCM node stat.
+ */
+public class SCMNodeStat {
+ private long capacity;
+ private long scmUsed;
+ private long remaining;
+
+ public SCMNodeStat() {
+ }
+
+ public SCMNodeStat(SCMNodeStat other) {
+ set(other.capacity, other.scmUsed, other.remaining);
+ }
+
+ /**
+ * @return the total configured capacity of the node.
+ */
+ public long getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * @return the total SCM used space on the node.
+ */
+ public long getScmUsed() {
+ return scmUsed;
+ }
+
+ /**
+ * @return the total remaining space available on the node.
+ */
+ public long getRemaining() {
+ return remaining;
+ }
+
+ @VisibleForTesting
+ public void set(long total, long used, long remain) {
+ this.capacity = total;
+ this.scmUsed = used;
+ this.remaining = remain;
+ }
+
+ public SCMNodeStat add(SCMNodeStat stat) {
+ this.capacity += stat.getCapacity();
+ this.scmUsed += stat.getScmUsed();
+ this.remaining += stat.getRemaining();
+ return this;
+ }
+
+ public SCMNodeStat subtract(SCMNodeStat stat) {
+ this.capacity -= stat.getCapacity();
+ this.scmUsed -= stat.getScmUsed();
+ this.remaining -= stat.getRemaining();
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object to) {
+ return this == to ||
+ (to instanceof SCMNodeStat &&
+ capacity == ((SCMNodeStat) to).getCapacity() &&
+ scmUsed == ((SCMNodeStat) to).getScmUsed() &&
+ remaining == ((SCMNodeStat) to).getRemaining());
+ }
+
+ @Override
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42; // any arbitrary constant will do
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index 5dea5cc684d..ee1378f1e89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -47,6 +47,23 @@ import "DatanodeContainerProtocol.proto";
*/
message SCMHeartbeatRequestProto {
required DatanodeIDProto datanodeID = 1;
+ optional SCMNodeReport nodeReport= 2;
+}
+
+/**
+* This message is send along with the heart beat to report datanode
+* storage utilization by SCM.
+*/
+message SCMNodeReport {
+ repeated SCMStorageReport storageReport= 1;
+}
+
+message SCMStorageReport {
+ required string storageUuid = 1;
+ optional uint64 capacity = 2 [default = 0];
+ optional uint64 scmUsed = 3 [default = 0];
+ optional uint64 remaining = 4 [default = 0];
+ optional StorageTypeProto storageType = 5 [default = DISK];
}
message SCMRegisterRequestProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 0e30bd9cfd4..b20202e9d21 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.scm.VersionInfo;
import java.io.IOException;
@@ -104,12 +106,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
* Used by data node to send a Heartbeat.
*
* @param datanodeID - Datanode ID.
+ * @param nodeReport - node report.
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
@Override
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
- sendHeartbeat(DatanodeID datanodeID)
+ sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport)
throws IOException {
rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index cde99a146a3..45e35fb937a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -237,7 +237,7 @@ public class TestEndPoint {
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
- .sendHeartbeat(dataNode);
+ .sendHeartbeat(dataNode, null);
Assert.assertNotNull(responseProto);
Assert.assertEquals(1, responseProto.getCommandsCount());
Assert.assertNotNull(responseProto.getCommandsList().get(0));
@@ -257,7 +257,7 @@ public class TestEndPoint {
.build();
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
HeartbeatEndpointTask endpointTask =
- new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf());
+ new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf(), null);
endpointTask.setContainerNodeIDProto(containerNodeID);
endpointTask.call();
Assert.assertNotNull(endpointTask.getContainerNodeIDProto());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
index 8cf960d284d..d298d434869 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
@@ -23,7 +23,11 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+
import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.SCMNodeStat;
import java.io.IOException;
import java.util.LinkedList;
@@ -172,6 +176,24 @@ public class MockNodeManager implements NodeManager {
return false;
}
+ /**
+ * Returns the aggregated node stats.
+ * @return the aggregated node stats.
+ */
+ @Override
+ public SCMNodeStat getStats() {
+ return null;
+ }
+
+ /**
+ * Return a list of node stats.
+ * @return a list of individual node stats (live/stale but not dead).
+ */
+ @Override
+ public List getNodeStats() {
+ return null;
+ }
+
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.
@@ -233,10 +255,12 @@ public class MockNodeManager implements NodeManager {
* Send heartbeat to indicate the datanode is alive and doing well.
*
* @param datanodeID - Datanode ID.
+ * @param nodeReport - node report.
* @return SCMheartbeat response list
*/
@Override
- public List sendHeartbeat(DatanodeID datanodeID) {
+ public List sendHeartbeat(DatanodeID datanodeID,
+ SCMNodeReport nodeReport) {
return null;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java
index 0f028716ac7..355baed66f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java
@@ -22,6 +22,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
@@ -103,7 +107,7 @@ public class TestNodeManager {
// Send some heartbeats from different nodes.
for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
- nodeManager.sendHeartbeat(datanodeID);
+ nodeManager.sendHeartbeat(datanodeID, null);
}
// Wait for 4 seconds max.
@@ -149,7 +153,7 @@ public class TestNodeManager {
// Need 100 nodes to come out of chill mode, only one node is sending HB.
nodeManager.setMinimumChillModeNodes(100);
- nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager));
+ nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager), null);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have been in " +
@@ -175,7 +179,7 @@ public class TestNodeManager {
// Send 10 heartbeat from same node, and assert we never leave chill mode.
for (int x = 0; x < 10; x++) {
- nodeManager.sendHeartbeat(datanodeID);
+ nodeManager.sendHeartbeat(datanodeID, null);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
@@ -204,7 +208,7 @@ public class TestNodeManager {
nodeManager.close();
// These should never be processed.
- nodeManager.sendHeartbeat(datanodeID);
+ nodeManager.sendHeartbeat(datanodeID, null);
// Let us just wait for 2 seconds to prove that HBs are not processed.
Thread.sleep(2 * 1000);
@@ -231,7 +235,7 @@ public class TestNodeManager {
for (int x = 0; x < count; x++) {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
- nodeManager.sendHeartbeat(datanodeID);
+ nodeManager.sendHeartbeat(datanodeID, null);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
@@ -317,14 +321,18 @@ public class TestNodeManager {
DatanodeID staleNode = SCMTestUtils.getDatanodeID(nodeManager);
// Heartbeat once
- nodeManager.sendHeartbeat(staleNode);
+ nodeManager.sendHeartbeat(staleNode, null);
// Heartbeat all other nodes.
- nodeList.forEach(nodeManager::sendHeartbeat);
+ for (DatanodeID dn : nodeList) {
+ nodeManager.sendHeartbeat(dn, null);
+ }
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
- nodeList.forEach(nodeManager::sendHeartbeat);
+ for (DatanodeID dn : nodeList) {
+ nodeManager.sendHeartbeat(dn, null);
+ }
// Wait for 2 more seconds, 3 seconds is the stale window for this test
Thread.sleep(2 * 1000);
@@ -367,19 +375,25 @@ public class TestNodeManager {
DatanodeID deadNode = SCMTestUtils.getDatanodeID(nodeManager);
// Heartbeat once
- nodeManager.sendHeartbeat(deadNode);
+ nodeManager.sendHeartbeat(deadNode, null);
// Heartbeat all other nodes.
- nodeList.forEach(nodeManager::sendHeartbeat);
+ for (DatanodeID dn : nodeList) {
+ nodeManager.sendHeartbeat(dn, null);
+ }
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
- nodeList.forEach(nodeManager::sendHeartbeat);
+ for (DatanodeID dn : nodeList) {
+ nodeManager.sendHeartbeat(dn, null);
+ }
Thread.sleep(3 * 1000);
// heartbeat good nodes again.
- nodeList.forEach(nodeManager::sendHeartbeat);
+ for (DatanodeID dn : nodeList) {
+ nodeManager.sendHeartbeat(dn, null);
+ }
// 6 seconds is the dead window for this test , so we wait a total of
// 7 seconds to make sure that the node moves into dead state.
@@ -408,7 +422,7 @@ public class TestNodeManager {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
- nodeManager.sendHeartbeat(null);
+ nodeManager.sendHeartbeat(null, null);
logCapturer.stopCapturing();
assertThat(logCapturer.getOutput(), containsString("Datanode ID in " +
"heartbeat is null"));
@@ -484,9 +498,9 @@ public class TestNodeManager {
SCMTestUtils.getDatanodeID(nodeManager, "StaleNode");
DatanodeID deadNode =
SCMTestUtils.getDatanodeID(nodeManager, "DeadNode");
- nodeManager.sendHeartbeat(healthyNode);
- nodeManager.sendHeartbeat(staleNode);
- nodeManager.sendHeartbeat(deadNode);
+ nodeManager.sendHeartbeat(healthyNode, null);
+ nodeManager.sendHeartbeat(staleNode, null);
+ nodeManager.sendHeartbeat(deadNode, null);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(500);
@@ -512,12 +526,12 @@ public class TestNodeManager {
* the 3 second windows.
*/
- nodeManager.sendHeartbeat(healthyNode);
- nodeManager.sendHeartbeat(staleNode);
- nodeManager.sendHeartbeat(deadNode);
+ nodeManager.sendHeartbeat(healthyNode, null);
+ nodeManager.sendHeartbeat(staleNode, null);
+ nodeManager.sendHeartbeat(deadNode, null);
Thread.sleep(1500);
- nodeManager.sendHeartbeat(healthyNode);
+ nodeManager.sendHeartbeat(healthyNode, null);
Thread.sleep(2 * 1000);
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
@@ -537,10 +551,10 @@ public class TestNodeManager {
* staleNode to move to stale state and deadNode to move to dead state.
*/
- nodeManager.sendHeartbeat(healthyNode);
- nodeManager.sendHeartbeat(staleNode);
+ nodeManager.sendHeartbeat(healthyNode, null);
+ nodeManager.sendHeartbeat(staleNode, null);
Thread.sleep(1500);
- nodeManager.sendHeartbeat(healthyNode);
+ nodeManager.sendHeartbeat(healthyNode, null);
Thread.sleep(2 * 1000);
// 3.5 seconds have elapsed for stale node, so it moves into Stale.
@@ -570,9 +584,9 @@ public class TestNodeManager {
* Cluster State : let us heartbeat all the nodes and verify that we get
* back all the nodes in healthy state.
*/
- nodeManager.sendHeartbeat(healthyNode);
- nodeManager.sendHeartbeat(staleNode);
- nodeManager.sendHeartbeat(deadNode);
+ nodeManager.sendHeartbeat(healthyNode, null);
+ nodeManager.sendHeartbeat(staleNode, null);
+ nodeManager.sendHeartbeat(deadNode, null);
Thread.sleep(500);
//Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size());
@@ -591,8 +605,9 @@ public class TestNodeManager {
private void heartbeatNodeSet(SCMNodeManager manager, List list,
int sleepDuration) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
- list.forEach(manager::sendHeartbeat);
- Thread.sleep(sleepDuration);
+ for (DatanodeID dn : list) {
+ manager.sendHeartbeat(dn, null);
+ } Thread.sleep(sleepDuration);
}
}
@@ -676,7 +691,10 @@ public class TestNodeManager {
// No Thread just one time HBs the node manager, so that these will be
// marked as dead nodes eventually.
- deadNodeList.forEach(nodeManager::sendHeartbeat);
+ for (DatanodeID dn : deadNodeList) {
+ nodeManager.sendHeartbeat(dn, null);
+ }
+
Thread thread1 = new Thread(healthyNodeTask);
thread1.setDaemon(true);
@@ -828,7 +846,7 @@ public class TestNodeManager {
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
nodeManager.setMinimumChillModeNodes(10);
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
- nodeManager.sendHeartbeat(datanodeID);
+ nodeManager.sendHeartbeat(datanodeID, null);
String status = nodeManager.getChillModeStatus();
Assert.assertThat(status, CoreMatchers.containsString("Still in chill " +
"mode. Waiting on nodes to report in."));
@@ -858,7 +876,7 @@ public class TestNodeManager {
// Assert that node manager force enter cannot be overridden by nodes HBs.
for(int x= 0; x < 20; x++) {
DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager);
- nodeManager.sendHeartbeat(datanode);
+ nodeManager.sendHeartbeat(datanode, null);
}
Thread.sleep(500);
@@ -873,6 +891,147 @@ public class TestNodeManager {
CoreMatchers.containsString("Out of chill mode."));
assertFalse(nodeManager.isInManualChillMode());
}
+ }
+ /**
+ * Test multiple nodes sending initial heartbeat with their node report.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void testScmStatsFromNodeReport() throws IOException,
+ InterruptedException, TimeoutException {
+ Configuration conf = getConf();
+ final int nodeCount = 10;
+ final long capacity = 2000;
+ final long used = 100;
+ final long remaining = capacity - used;
+
+ try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+ for (int x = 0; x < nodeCount; x++) {
+ DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
+
+ SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+ SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+ srb.setStorageUuid(UUID.randomUUID().toString());
+ srb.setCapacity(capacity).setScmUsed(used).
+ setRemaining(capacity - used).build();
+ nodeManager.sendHeartbeat(datanodeID,
+ nrb.addStorageReport(srb).build());
+ }
+ GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
+ 4 * 1000);
+ assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+ assertEquals(capacity * nodeCount,
+ nodeManager.getStats().getCapacity());
+ assertEquals(used * nodeCount,
+ nodeManager.getStats().getScmUsed());
+ assertEquals(remaining * nodeCount,
+ nodeManager.getStats().getRemaining());
+ }
+ }
+
+ /**
+ * Test single node stat update based on nodereport from different heartbeat
+ * status (healthy, stale and dead).
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void testScmNodeReportUpdate() throws IOException,
+ InterruptedException, TimeoutException {
+ Configuration conf = getConf();
+ final int heartbeatCount = 5;
+ final int nodeCount = 1;
+ final int interval = 100;
+
+ conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
+ conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+ conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
+ conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
+
+ try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+ DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
+ final long capacity = 2000;
+ final long usedPerHeartbeat = 100;
+
+ for (int x = 0; x < heartbeatCount; x++) {
+ SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+ SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+ srb.setStorageUuid(UUID.randomUUID().toString());
+ srb.setCapacity(capacity).setScmUsed(x * usedPerHeartbeat)
+ .setRemaining(capacity - x * usedPerHeartbeat).build();
+ nrb.addStorageReport(srb);
+
+ nodeManager.sendHeartbeat(datanodeID, nrb.build());
+ Thread.sleep(100);
+ }
+
+ GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
+ 4 * 1000);
+ assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+
+ final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount -1);
+ final long expectedRemaining = capacity -
+ usedPerHeartbeat * (heartbeatCount - 1);
+ assertEquals(capacity, nodeManager.getStats().getCapacity());
+ assertEquals(expectedScmUsed, nodeManager.getStats().getScmUsed());
+ assertEquals(expectedRemaining, nodeManager.getStats().getRemaining());
+
+ // Test NodeManager#getNodeStats
+ assertEquals(nodeCount, nodeManager.getNodeStats().size());
+ assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
+ assertEquals(expectedScmUsed,
+ nodeManager.getNodeStats().get(0).getScmUsed());
+ assertEquals(expectedRemaining,
+ nodeManager.getNodeStats().get(0).getRemaining());
+
+ // Wait up to 4s so that the node becomes stale
+ // Verify the usage info should be unchanged.
+ GenericTestUtils.waitFor(
+ () -> nodeManager.getNodeCount(NodeManager.NODESTATE.STALE) == 1, 100,
+ 4 * 1000);
+ assertEquals(nodeCount, nodeManager.getNodeStats().size());
+ assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
+ assertEquals(expectedScmUsed,
+ nodeManager.getNodeStats().get(0).getScmUsed());
+ assertEquals(expectedRemaining,
+ nodeManager.getNodeStats().get(0).getRemaining());
+
+ // Wait up to 3 more seconds so the node becomes dead
+ // Verify usage info should be updated.
+ GenericTestUtils.waitFor(
+ () -> nodeManager.getNodeCount(NodeManager.NODESTATE.DEAD) == 1, 100,
+ 3 * 1000);
+
+ assertEquals(0, nodeManager.getNodeStats().size());
+ assertEquals(0, nodeManager.getStats().getCapacity());
+ assertEquals(0, nodeManager.getStats().getScmUsed());
+ assertEquals(0, nodeManager.getStats().getRemaining());
+
+ // Send a new report to bring the dead node back to healty
+ SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+ SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+ srb.setStorageUuid(UUID.randomUUID().toString());
+ srb.setCapacity(capacity).setScmUsed(expectedScmUsed)
+ .setRemaining(expectedRemaining).build();
+ nrb.addStorageReport(srb);
+ nodeManager.sendHeartbeat(datanodeID, nrb.build());
+
+ // Wait up to 5 seconds so that the dead node becomes healthy
+ // Verify usage info should be updated.
+ GenericTestUtils.waitFor(
+ () -> nodeManager.getNodeCount(NodeManager.NODESTATE.HEALTHY) == 1,
+ 100, 5 * 1000);
+ assertEquals(nodeCount, nodeManager.getNodeStats().size());
+ assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
+ assertEquals(expectedScmUsed,
+ nodeManager.getNodeStats().get(0).getScmUsed());
+ assertEquals(expectedRemaining,
+ nodeManager.getNodeStats().get(0).getRemaining());
+ }
}
}