HDFS-11447. Ozone: SCM: Send node report to SCM with heartbeat.Contributed by Xiaoyu Yao.

This commit is contained in:
Anu Engineer 2017-03-01 11:47:22 -08:00
parent 3eae84c1be
commit 7aa0a44cf1
26 changed files with 994 additions and 80 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces
@ -29,12 +30,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
/**
* A class that tells the ContainerManager where to place the containers.
* Please note : There is *no* one-to-one correlation between metadata
* metadataLocations and data metadataLocations.
* Locations and data Locations.
*
* For example : A user could map all container files to a
* SSD but leave data/metadata on bunch of other disks.
@ -43,24 +45,30 @@ public class ContainerLocationManagerImpl implements ContainerLocationManager {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerLocationManagerImpl.class);
private final List<StorageLocation> dataLocations;
private final List<ContainerStorageLocation> dataLocations;
private int currentIndex;
private final List<StorageLocation> metadataLocations;
/**
* Constructs a Location Manager.
* @param metadataLocations - Refers to the metadataLocations
* where we store the container metadata.
* @param dataDirs - metadataLocations where we store the actual
* data or chunk files.
* @param conf - configuration.
* @throws IOException
*/
public ContainerLocationManagerImpl(List<StorageLocation> metadataLocations,
List<StorageLocation> dataDirs)
List<StorageLocation> dataDirs, Configuration conf)
throws IOException {
dataLocations = dataDirs;
dataLocations = new LinkedList<>();
for (StorageLocation dataDir : dataDirs) {
dataLocations.add(new ContainerStorageLocation(dataDir, conf));
}
this.metadataLocations = metadataLocations;
}
/**
* Returns the path where the container should be placed from a set of
* metadataLocations.
@ -91,4 +99,45 @@ public class ContainerLocationManagerImpl implements ContainerLocationManager {
currentPath = currentPath.resolve(OzoneConsts.CONTAINER_PREFIX);
return currentPath.resolve(containerName);
}
@Override
public StorageLocationReport[] getLocationReport() throws IOException {
StorageLocationReport[] reports =
new StorageLocationReport[dataLocations.size()];
for (int idx = 0; idx < dataLocations.size(); idx++) {
ContainerStorageLocation loc = dataLocations.get(idx);
long scmUsed = 0;
long remaining = 0;
try {
scmUsed = loc.getScmUsed();
remaining = loc.getAvailable();
} catch (IOException ex) {
LOG.warn("Failed to get scmUsed and remaining for container " +
"storage location {}", loc.getNormalizedUri());
// reset scmUsed and remaining if df/du failed.
scmUsed = 0;
remaining = 0;
}
// TODO: handle failed storage
// For now, include storage report for location that failed to get df/du.
StorageLocationReport r = new StorageLocationReport(
loc.getStorageUuId(), false, loc.getCapacity(),
scmUsed, remaining);
reports[idx++] = r;
}
return reports;
}
/**
* Supports clean shutdown of container location du threads.
*
* @throws IOException
*/
@Override
public void shutdown() throws IOException {
for (ContainerStorageLocation loc: dataLocations) {
loc.shutdown();
}
}
}

View File

@ -24,6 +24,10 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConsts;
@ -120,7 +124,7 @@ public class ContainerManagerImpl implements ContainerManager {
dataDirs.add(location);
}
this.locationManager =
new ContainerLocationManagerImpl(containerDirs, dataDirs);
new ContainerLocationManagerImpl(containerDirs, dataDirs, config);
} finally {
readUnlock();
@ -395,9 +399,10 @@ public class ContainerManagerImpl implements ContainerManager {
* @throws IOException
*/
@Override
public void shutdown() {
public void shutdown() throws IOException {
Preconditions.checkState(this.hasWriteLock());
this.containerMap.clear();
this.locationManager.shutdown();
}
@ -497,6 +502,25 @@ public class ContainerManagerImpl implements ContainerManager {
return this.keyManager;
}
/**
* Get the node report.
* @return node report.
*/
@Override
public SCMNodeReport getNodeReport() throws IOException {
StorageLocationReport[] reports = locationManager.getLocationReport();
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
for (int i = 0; i < reports.length; i++) {
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
nrb.addStorageReport(i, srb.setStorageUuid(reports[i].getId())
.setCapacity(reports[i].getCapacity())
.setScmUsed(reports[i].getScmUsed())
.setRemaining(reports[i].getRemaining())
.build());
}
return nrb.build();
}
/**
* Filter out only container files from the container metadata dir.
*/

View File

@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
/**
* Class that wraps the space usage of the Datanode Container Storage Location
* by SCM containers.
*/
public class ContainerStorageLocation {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerStorageLocation.class);
private static final String DU_CACHE_FILE = "scmUsed";
private volatile boolean scmUsedSaved = false;
private final StorageLocation dataLocation;
private final String storageUuId;
private final DF usage;
private final GetSpaceUsed scmUsage;
private final File scmUsedFile;
public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf)
throws IOException {
this.dataLocation = dataLoc;
this.storageUuId = DatanodeStorage.generateUuid();
File dataDir = new File(dataLoc.getNormalizedUri().getPath());
scmUsedFile = new File(dataDir, DU_CACHE_FILE);
// get overall disk usage
this.usage = new DF(dataDir, conf);
// get SCM specific usage
this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir)
.setConf(conf)
.setInitialUsed(loadScmUsed())
.build();
// Ensure scm usage is saved during shutdown.
ShutdownHookManager.get().addShutdownHook(
new Runnable() {
@Override
public void run() {
if (!scmUsedSaved) {
saveScmUsed();
}
}
}, SHUTDOWN_HOOK_PRIORITY);
}
public URI getNormalizedUri() {
return dataLocation.getNormalizedUri();
}
public String getStorageUuId() {
return storageUuId;
}
public long getCapacity() {
long capacity = usage.getCapacity();
return (capacity > 0) ? capacity : 0;
}
public long getAvailable() throws IOException {
long remaining = getCapacity() - getScmUsed();
long available = usage.getAvailable();
if (remaining > available) {
remaining = available;
}
return (remaining > 0) ? remaining : 0;
}
public long getScmUsed() throws IOException{
return scmUsage.getUsed();
}
public void shutdown() {
saveScmUsed();
scmUsedSaved = true;
if (scmUsage instanceof CachingGetSpaceUsed) {
IOUtils.cleanup(null, ((CachingGetSpaceUsed) scmUsage));
}
}
/**
* Read in the cached DU value and return it if it is less than 600 seconds
* old (DU update interval). Slight imprecision of scmUsed is not critical
* and skipping DU can significantly shorten the startup time.
* If the cached value is not available or too old, -1 is returned.
*/
long loadScmUsed() {
long cachedScmUsed;
long mtime;
Scanner sc;
try {
sc = new Scanner(scmUsedFile, "UTF-8");
} catch (FileNotFoundException fnfe) {
return -1;
}
try {
// Get the recorded scmUsed from the file.
if (sc.hasNextLong()) {
cachedScmUsed = sc.nextLong();
} else {
return -1;
}
// Get the recorded mtime from the file.
if (sc.hasNextLong()) {
mtime = sc.nextLong();
} else {
return -1;
}
// Return the cached value if mtime is okay.
if (mtime > 0 && (Time.now() - mtime < 600000L)) {
LOG.info("Cached ScmUsed found for {} : {} ", dataLocation,
cachedScmUsed);
return cachedScmUsed;
}
return -1;
} finally {
sc.close();
}
}
/**
* Write the current scmUsed to the cache file.
*/
void saveScmUsed() {
if (scmUsedFile.exists() && !scmUsedFile.delete()) {
LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation);
}
OutputStreamWriter out = null;
try {
long used = getScmUsed();
if (used > 0) {
out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
StandardCharsets.UTF_8);
// mtime is written last, so that truncated writes won't be valid.
out.write(Long.toString(used) + " " + Long.toString(Time.now()));
out.flush();
out.close();
out = null;
}
} catch (IOException ioe) {
// If write failed, the volume might be bad. Since the cache file is
// not critical, log the error and continue.
LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
} finally {
IOUtils.cleanup(null, out);
}
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
/**
* Storage location stats of datanodes that provide back store for containers.
*
*/
public class StorageLocationReport {
public static final StorageLocationReport[] EMPTY_ARRAY = {};
private final String id;
private final boolean failed;
private final long capacity;
private final long scmUsed;
private final long remaining;
public StorageLocationReport(String id, boolean failed,
long capacity, long scmUsed, long remaining) {
this.id = id;
this.failed = failed;
this.capacity = capacity;
this.scmUsed = scmUsed;
this.remaining = remaining;
}
public String getId() {
return id;
}
public boolean isFailed() {
return failed;
}
public long getCapacity() {
return capacity;
}
public long getScmUsed() {
return scmUsed;
}
public long getRemaining() {
return remaining;
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.container.common.interfaces;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import java.io.IOException;
import java.nio.file.Path;
@ -41,4 +43,16 @@ public interface ContainerLocationManager {
*/
Path getDataPath(String containerName) throws IOException;
/**
* Returns an array of storage location usage report.
* @return storage location usage report.
*/
StorageLocationReport[] getLocationReport() throws IOException;
/**
* Supports clean shutdown of container.
*
* @throws IOException
*/
void shutdown() throws IOException;
}

View File

@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException;
@ -93,7 +95,7 @@ public interface ContainerManager extends RwLock {
*
* @throws IOException
*/
void shutdown();
void shutdown() throws IOException;
/**
* Sets the Chunk Manager.
@ -123,4 +125,9 @@ public interface ContainerManager extends RwLock {
*/
KeyManager getKeyManager();
/**
* Get the Node Report of container storage usage.
* @return node report.
*/
SCMNodeReport getNodeReport() throws IOException;
}

View File

@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.interfaces;
/**
This package contains common ozone container interfaces.
*/

View File

@ -82,6 +82,7 @@ public class DatanodeStateMachine implements Closeable {
try {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
nextHB = Time.monotonicNow() + heartbeatFrequency;
context.setReportState(container.getNodeReport());
context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS);
now = Time.monotonicNow();

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import java.util.LinkedList;
import java.util.Queue;
@ -43,6 +44,7 @@ public class StateContext {
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private DatanodeStateMachine.DatanodeStates state;
private SCMNodeReport nrState;
/**
* Constructs a StateContext.
@ -59,6 +61,7 @@ public class StateContext {
commandQueue = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
nrState = SCMNodeReport.getDefaultInstance();
}
/**
@ -111,6 +114,22 @@ public class StateContext {
this.state = state;
}
/**
* Returns the node report of the datanode state context.
* @return the node report.
*/
public SCMNodeReport getNodeReport() {
return nrState;
}
/**
* Sets the storage location report of the datanode state context.
* @param nrReport - node report
*/
public void setReportState(SCMNodeReport nrReport) {
this.nrState = nrReport;
}
/**
* Returns the next task to get executed by the datanode state machine.
* @return A callable that will be executed by the

View File

@ -231,6 +231,7 @@ public class RunningDatanodeState implements DatanodeState {
.setConfig(conf)
.setEndpointStateMachine(endpoint)
.setNodeID(getContainerNodeID())
.setContext(context)
.build();
case SHUTDOWN:
break;

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.slf4j.Logger;
@ -41,6 +42,7 @@ public class HeartbeatEndpointTask
private final EndpointStateMachine rpcEndpoint;
private final Configuration conf;
private ContainerNodeIDProto containerNodeIDProto;
private StateContext context;
/**
* Constructs a SCM heart beat.
@ -48,9 +50,10 @@ public class HeartbeatEndpointTask
* @param conf Config.
*/
public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
Configuration conf) {
Configuration conf, StateContext context) {
this.rpcEndpoint = rpcEndpoint;
this.conf = conf;
this.context = context;
}
/**
@ -85,8 +88,9 @@ public class HeartbeatEndpointTask
Preconditions.checkState(this.containerNodeIDProto != null);
DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this
.containerNodeIDProto.getDatanodeID());
// TODO : Add the command to command processor queue.
rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID);
rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID,
this.context.getNodeReport());
rpcEndpoint.zeroMissedCount();
} catch (IOException ex) {
rpcEndpoint.logIfNeeded(ex
@ -112,6 +116,7 @@ public class HeartbeatEndpointTask
private EndpointStateMachine endPointStateMachine;
private Configuration conf;
private ContainerNodeIDProto containerNodeIDProto;
private StateContext context;
/**
* Constructs the builder class.
@ -152,6 +157,16 @@ public class HeartbeatEndpointTask
return this;
}
/**
* Sets the context.
* @param stateContext - State context.
* @return this.
*/
public Builder setContext(StateContext stateContext) {
this.context = stateContext;
return this;
}
public HeartbeatEndpointTask build() {
if (endPointStateMachine == null) {
LOG.error("No endpoint specified.");
@ -172,10 +187,9 @@ public class HeartbeatEndpointTask
}
HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
.endPointStateMachine, this.conf);
.endPointStateMachine, this.conf, this.context);
task.setContainerNodeIDProto(containerNodeIDProto);
return task;
}
}
}

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -138,6 +140,8 @@ public class OzoneContainer {
this.keyManager.shutdown();
this.manager.shutdown();
LOG.info("container services shutdown complete.");
} catch (IOException ex) {
LOG.warn("container service shutdown error:", ex);
} finally {
this.manager.writeUnlock();
}
@ -155,4 +159,11 @@ public class OzoneContainer {
pathList.add(location);
}
}
/**
* Returns node report of container storage usage.
*/
public SCMNodeReport getNodeReport() throws IOException {
return this.manager.getNodeReport();
}
}

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import java.io.IOException;
@ -41,11 +42,12 @@ public interface StorageContainerDatanodeProtocol {
/**
* Used by data node to send a Heartbeat.
* @param datanodeID - Datanode ID.
* @param nodeReport - node report state
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
throws IOException;
SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) throws IOException;
/**
* Register Datanode.

View File

@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import java.util.List;
@ -54,8 +55,10 @@ public interface StorageContainerNodeProtocol {
/**
* Send heartbeat to indicate the datanode is alive and doing well.
* @param datanodeID - Datanode ID.
* @param nodeReport - node report.
* @return SCMheartbeat response list
*/
List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport);
List<SCMCommand> sendHeartbeat(DatanodeID datanodeID);
}

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto
@ -113,15 +115,17 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
* Send by datanode to SCM.
*
* @param datanodeID - DatanodeID
* @param nodeReport - node report
* @throws IOException
*/
@Override
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
throws IOException {
SCMHeartbeatRequestProto.Builder req =
SCMHeartbeatRequestProto.newBuilder();
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) throws IOException {
SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
.newBuilder();
req.setDatanodeID(datanodeID.getProtoBufMessage());
req.setNodeReport(nodeReport);
final SCMHeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());

View File

@ -78,7 +78,7 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
SCMHeartbeatRequestProto request) throws ServiceException {
try {
return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request
.getDatanodeID()));
.getDatanodeID()), request.getNodeReport());
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto
@ -393,9 +395,10 @@ public class StorageContainerManager
* @throws IOException
*/
@Override
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
throws IOException {
List<SCMCommand> commands = getScmNodeManager().sendHeartbeat(datanodeID);
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) throws IOException {
List<SCMCommand> commands =
getScmNodeManager().sendHeartbeat(datanodeID, nodeReport);
List<SCMCommandResponseProto> cmdReponses = new LinkedList<>();
for (SCMCommand cmd : commands) {
cmdReponses.add(getCommandResponse(cmd));

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.node;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import static org.apache.hadoop.util.Time.monotonicNow;
/**
* This class represents the item in SCM heartbeat queue.
*/
public class HeartbeatQueueItem {
private DatanodeID datanodeID;
private long recvTimestamp;
private SCMNodeReport nodeReport;
/**
*
* @param datanodeID - datanode ID of the heartbeat.
* @param recvTimestamp - heartbeat receive timestamp.
* @param nodeReport - node report associated with the heartbeat if any.
*/
HeartbeatQueueItem(DatanodeID datanodeID, long recvTimestamp,
SCMNodeReport nodeReport) {
this.datanodeID = datanodeID;
this.recvTimestamp = recvTimestamp;
this.nodeReport = nodeReport;
}
/**
* @return datanode ID.
*/
public DatanodeID getDatanodeID() {
return datanodeID;
}
/**
* @return node report.
*/
public SCMNodeReport getNodeReport() {
return nodeReport;
}
/**
* @return heartbeat receive timestamp.
*/
public long getRecvTimestamp() {
return recvTimestamp;
}
/**
* Builder for HeartbeatQueueItem.
*/
public static class Builder {
private DatanodeID datanodeID;
private SCMNodeReport nodeReport;
private long recvTimestamp = monotonicNow();
public Builder setDatanodeID(DatanodeID datanodeId) {
this.datanodeID = datanodeId;
return this;
}
public Builder setNodeReport(SCMNodeReport scmNodeReport) {
this.nodeReport = scmNodeReport;
return this;
}
@VisibleForTesting
public Builder setRecvTimestamp(long recvTime) {
this.recvTimestamp = recvTime;
return this;
}
public HeartbeatQueueItem build() {
return new HeartbeatQueueItem(datanodeID, recvTimestamp, nodeReport);
}
}
}

View File

@ -135,4 +135,16 @@ public interface NodeManager extends StorageContainerNodeProtocol, Closeable,
DEAD
}
/**
* Returns the aggregated node stats.
* @return the aggregated node stats.
*/
SCMNodeStat getStats();
/**
* Return a list of node stats.
* @return a list of individual node stats (live/stale but not dead).
*/
List<SCMNodeStat> getNodeStats();
}

View File

@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@ -34,8 +35,13 @@ import org.apache.hadoop.ozone.protocol.proto
.ErrorCode;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol
.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol
.proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.scm.VersionInfo;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,7 +73,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
* <p>
* Each heartbeat that SCMNodeManager receives is put into heartbeatQueue. The
* worker thread wakes up and grabs that heartbeat from the queue. The worker
* thread will lookup the healthynodes map and update the timestamp if the entry
* thread will lookup the healthynodes map and set the timestamp if the entry
* is there. if not it will look up stale and deadnodes map.
* <p>
* The getNode(byState) functions make copy of node maps and then creates a list
@ -85,14 +91,20 @@ public class SCMNodeManager
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMNodeManager.class);
/**
* Key = NodeID, value = timestamp.
*/
private final Map<String, Long> healthyNodes;
private final Map<String, Long> staleNodes;
private final Map<String, Long> deadNodes;
private final Queue<DatanodeID> heartbeatQueue;
private final Queue<HeartbeatQueueItem> heartbeatQueue;
private final Map<String, DatanodeID> nodes;
// Individual live node stats
private final Map<String, SCMNodeStat> nodeStats;
// Aggregated node stats
private SCMNodeStat scmStat;
// TODO: expose nodeStats and scmStat as metrics
private final AtomicInteger healthyNodeCount;
private final AtomicInteger staleNodeCount;
private final AtomicInteger deadNodeCount;
@ -121,6 +133,8 @@ public class SCMNodeManager
deadNodes = new ConcurrentHashMap<>();
staleNodes = new ConcurrentHashMap<>();
nodes = new HashMap<>();
nodeStats = new HashedMap();
scmStat = new SCMNodeStat();
healthyNodeCount = new AtomicInteger(0);
staleNodeCount = new AtomicInteger(0);
@ -158,7 +172,7 @@ public class SCMNodeManager
*/
@Override
public void removeNode(DatanodeID node) throws UnregisteredNodeException {
// TODO : Fix me.
// TODO : Fix me when adding the SCM CLI.
}
@ -371,9 +385,9 @@ public class SCMNodeManager
// Process the whole queue.
while (!heartbeatQueue.isEmpty() &&
(lastHBProcessedCount < maxHBToProcessPerLoop)) {
DatanodeID datanodeID = heartbeatQueue.poll();
HeartbeatQueueItem hbItem = heartbeatQueue.poll();
synchronized (this) {
handleHeartbeat(datanodeID);
handleHeartbeat(hbItem);
}
// we are shutting down or something give up processing the rest of
// HBs. This will terminate the HB processing thread.
@ -439,7 +453,8 @@ public class SCMNodeManager
// 4. And the most important reason, heartbeats are not blocked even if
// this thread does not run, they will go into the processing queue.
if (!Thread.currentThread().isInterrupted()) {
if (!Thread.currentThread().isInterrupted() &&
!executorService.isShutdown()) {
executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit
.MILLISECONDS);
} else {
@ -489,40 +504,85 @@ public class SCMNodeManager
staleNodeCount.decrementAndGet();
deadNodes.put(entry.getKey(), entry.getValue());
deadNodeCount.incrementAndGet();
// Update SCM node stats
SCMNodeStat deadNodeStat = nodeStats.get(entry.getKey());
scmStat.subtract(deadNodeStat);
nodeStats.remove(entry.getKey());
}
/**
* Handles a single heartbeat from a datanode.
*
* @param datanodeID - datanode ID.
* @param hbItem - heartbeat item from a datanode.
*/
private void handleHeartbeat(DatanodeID datanodeID) {
private void handleHeartbeat(HeartbeatQueueItem hbItem) {
lastHBProcessedCount++;
String datanodeID = hbItem.getDatanodeID().getDatanodeUuid();
SCMNodeReport nodeReport = hbItem.getNodeReport();
long recvTimestamp = hbItem.getRecvTimestamp();
long processTimestamp = Time.monotonicNow();
if (LOG.isTraceEnabled()) {
//TODO: add average queue time of heartbeat request as metrics
LOG.trace("Processing Heartbeat from datanode {}: queueing time {}",
datanodeID, processTimestamp - recvTimestamp);
}
// If this node is already in the list of known and healthy nodes
// just update the last timestamp and return.
if (healthyNodes.containsKey(datanodeID.getDatanodeUuid())) {
healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
// just set the last timestamp and return.
if (healthyNodes.containsKey(datanodeID)) {
healthyNodes.put(datanodeID, processTimestamp);
updateNodeStat(datanodeID, nodeReport);
return;
}
// A stale node has heartbeat us we need to remove the node from stale
// list and move to healthy list.
if (staleNodes.containsKey(datanodeID.getDatanodeUuid())) {
staleNodes.remove(datanodeID.getDatanodeUuid());
healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
if (staleNodes.containsKey(datanodeID)) {
staleNodes.remove(datanodeID);
healthyNodes.put(datanodeID, processTimestamp);
healthyNodeCount.incrementAndGet();
staleNodeCount.decrementAndGet();
updateNodeStat(datanodeID, nodeReport);
return;
}
// A dead node has heartbeat us, we need to remove that node from dead
// node list and move it to the healthy list.
if (deadNodes.containsKey(datanodeID.getDatanodeUuid())) {
deadNodes.remove(datanodeID.getDatanodeUuid());
healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
if (deadNodes.containsKey(datanodeID)) {
deadNodes.remove(datanodeID);
healthyNodes.put(datanodeID, processTimestamp);
deadNodeCount.decrementAndGet();
healthyNodeCount.incrementAndGet();
updateNodeStat(datanodeID, nodeReport);
return;
}
LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeID);
}
private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) {
SCMNodeStat stat = nodeStats.get(datanodeID);
if (stat == null) {
LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
"dead datanode {}", datanodeID);
stat = new SCMNodeStat();
}
if (nodeReport != null && nodeReport.getStorageReportCount() > 0) {
long totalCapacity = 0;
long totalRemaining = 0;
long totalScmUsed = 0;
List<SCMStorageReport> storageReports = nodeReport.getStorageReportList();
for (SCMStorageReport report : storageReports) {
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalScmUsed += report.getScmUsed();
}
scmStat.subtract(stat);
stat.set(totalCapacity, totalScmUsed, totalRemaining);
nodeStats.put(datanodeID, stat);
scmStat.add(stat);
}
}
@ -591,6 +651,7 @@ public class SCMNodeManager
totalNodes.incrementAndGet();
healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
healthyNodeCount.incrementAndGet();
nodeStats.put(datanodeID.getDatanodeUuid(), new SCMNodeStat());
LOG.info("Data node with ID: {} Registered.",
datanodeID.getDatanodeUuid());
return RegisteredCommand.newBuilder()
@ -625,23 +686,47 @@ public class SCMNodeManager
* Send heartbeat to indicate the datanode is alive and doing well.
*
* @param datanodeID - Datanode ID.
* @param nodeReport - node report.
* @return SCMheartbeat response.
* @throws IOException
*/
@Override
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID) {
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) {
// Checking for NULL to make sure that we don't get
// an exception from ConcurrentList.
// This could be a problem in tests, if this function is invoked via
// protobuf, transport layer will guarantee that this is not null.
if (datanodeID != null) {
heartbeatQueue.add(datanodeID);
heartbeatQueue.add(
new HeartbeatQueueItem.Builder()
.setDatanodeID(datanodeID)
.setNodeReport(nodeReport)
.build());
} else {
LOG.error("Datanode ID in heartbeat is null");
}
return commandQueue.getCommand(datanodeID);
}
/**
* Returns the aggregated node stats.
* @return the aggregated node stats.
*/
@Override
public SCMNodeStat getStats() {
return new SCMNodeStat(this.scmStat);
}
/**
* Return a list of node stats.
* @return a list of individual node stats (live/stale but not dead).
*/
@Override
public List<SCMNodeStat> getNodeStats(){
return nodeStats.entrySet().stream().map(
entry -> nodeStats.get(entry.getKey())).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.node;
import com.google.common.annotations.VisibleForTesting;
/**
* This class represents the SCM node stat.
*/
public class SCMNodeStat {
private long capacity;
private long scmUsed;
private long remaining;
public SCMNodeStat() {
}
public SCMNodeStat(SCMNodeStat other) {
set(other.capacity, other.scmUsed, other.remaining);
}
/**
* @return the total configured capacity of the node.
*/
public long getCapacity() {
return capacity;
}
/**
* @return the total SCM used space on the node.
*/
public long getScmUsed() {
return scmUsed;
}
/**
* @return the total remaining space available on the node.
*/
public long getRemaining() {
return remaining;
}
@VisibleForTesting
public void set(long total, long used, long remain) {
this.capacity = total;
this.scmUsed = used;
this.remaining = remain;
}
public SCMNodeStat add(SCMNodeStat stat) {
this.capacity += stat.getCapacity();
this.scmUsed += stat.getScmUsed();
this.remaining += stat.getRemaining();
return this;
}
public SCMNodeStat subtract(SCMNodeStat stat) {
this.capacity -= stat.getCapacity();
this.scmUsed -= stat.getScmUsed();
this.remaining -= stat.getRemaining();
return this;
}
@Override
public boolean equals(Object to) {
return this == to ||
(to instanceof SCMNodeStat &&
capacity == ((SCMNodeStat) to).getCapacity() &&
scmUsed == ((SCMNodeStat) to).getScmUsed() &&
remaining == ((SCMNodeStat) to).getRemaining());
}
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 42; // any arbitrary constant will do
}
}

View File

@ -47,6 +47,23 @@ import "DatanodeContainerProtocol.proto";
*/
message SCMHeartbeatRequestProto {
required DatanodeIDProto datanodeID = 1;
optional SCMNodeReport nodeReport= 2;
}
/**
* This message is send along with the heart beat to report datanode
* storage utilization by SCM.
*/
message SCMNodeReport {
repeated SCMStorageReport storageReport= 1;
}
message SCMStorageReport {
required string storageUuid = 1;
optional uint64 capacity = 2 [default = 0];
optional uint64 scmUsed = 3 [default = 0];
optional uint64 remaining = 4 [default = 0];
optional StorageTypeProto storageType = 5 [default = DISK];
}
message SCMRegisterRequestProto {

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.scm.VersionInfo;
import java.io.IOException;
@ -104,12 +106,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
* Used by data node to send a Heartbeat.
*
* @param datanodeID - Datanode ID.
* @param nodeReport - node report.
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
@Override
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
sendHeartbeat(DatanodeID datanodeID)
sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport)
throws IOException {
rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet();

View File

@ -237,7 +237,7 @@ public class TestEndPoint {
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
.sendHeartbeat(dataNode);
.sendHeartbeat(dataNode, null);
Assert.assertNotNull(responseProto);
Assert.assertEquals(1, responseProto.getCommandsCount());
Assert.assertNotNull(responseProto.getCommandsList().get(0));
@ -257,7 +257,7 @@ public class TestEndPoint {
.build();
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
HeartbeatEndpointTask endpointTask =
new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf());
new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf(), null);
endpointTask.setContainerNodeIDProto(containerNodeID);
endpointTask.call();
Assert.assertNotNull(endpointTask.getContainerNodeIDProto());

View File

@ -23,7 +23,11 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeStat;
import java.io.IOException;
import java.util.LinkedList;
@ -172,6 +176,24 @@ public class MockNodeManager implements NodeManager {
return false;
}
/**
* Returns the aggregated node stats.
* @return the aggregated node stats.
*/
@Override
public SCMNodeStat getStats() {
return null;
}
/**
* Return a list of node stats.
* @return a list of individual node stats (live/stale but not dead).
*/
@Override
public List<SCMNodeStat> getNodeStats() {
return null;
}
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.
@ -233,10 +255,12 @@ public class MockNodeManager implements NodeManager {
* Send heartbeat to indicate the datanode is alive and doing well.
*
* @param datanodeID - Datanode ID.
* @param nodeReport - node report.
* @return SCMheartbeat response list
*/
@Override
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID) {
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) {
return null;
}
}

View File

@ -22,6 +22,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
@ -103,7 +107,7 @@ public class TestNodeManager {
// Send some heartbeats from different nodes.
for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID);
nodeManager.sendHeartbeat(datanodeID, null);
}
// Wait for 4 seconds max.
@ -149,7 +153,7 @@ public class TestNodeManager {
// Need 100 nodes to come out of chill mode, only one node is sending HB.
nodeManager.setMinimumChillModeNodes(100);
nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager));
nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager), null);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have been in " +
@ -175,7 +179,7 @@ public class TestNodeManager {
// Send 10 heartbeat from same node, and assert we never leave chill mode.
for (int x = 0; x < 10; x++) {
nodeManager.sendHeartbeat(datanodeID);
nodeManager.sendHeartbeat(datanodeID, null);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
@ -204,7 +208,7 @@ public class TestNodeManager {
nodeManager.close();
// These should never be processed.
nodeManager.sendHeartbeat(datanodeID);
nodeManager.sendHeartbeat(datanodeID, null);
// Let us just wait for 2 seconds to prove that HBs are not processed.
Thread.sleep(2 * 1000);
@ -231,7 +235,7 @@ public class TestNodeManager {
for (int x = 0; x < count; x++) {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID);
nodeManager.sendHeartbeat(datanodeID, null);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
@ -317,14 +321,18 @@ public class TestNodeManager {
DatanodeID staleNode = SCMTestUtils.getDatanodeID(nodeManager);
// Heartbeat once
nodeManager.sendHeartbeat(staleNode);
nodeManager.sendHeartbeat(staleNode, null);
// Heartbeat all other nodes.
nodeList.forEach(nodeManager::sendHeartbeat);
for (DatanodeID dn : nodeList) {
nodeManager.sendHeartbeat(dn, null);
}
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
nodeList.forEach(nodeManager::sendHeartbeat);
for (DatanodeID dn : nodeList) {
nodeManager.sendHeartbeat(dn, null);
}
// Wait for 2 more seconds, 3 seconds is the stale window for this test
Thread.sleep(2 * 1000);
@ -367,19 +375,25 @@ public class TestNodeManager {
DatanodeID deadNode = SCMTestUtils.getDatanodeID(nodeManager);
// Heartbeat once
nodeManager.sendHeartbeat(deadNode);
nodeManager.sendHeartbeat(deadNode, null);
// Heartbeat all other nodes.
nodeList.forEach(nodeManager::sendHeartbeat);
for (DatanodeID dn : nodeList) {
nodeManager.sendHeartbeat(dn, null);
}
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
nodeList.forEach(nodeManager::sendHeartbeat);
for (DatanodeID dn : nodeList) {
nodeManager.sendHeartbeat(dn, null);
}
Thread.sleep(3 * 1000);
// heartbeat good nodes again.
nodeList.forEach(nodeManager::sendHeartbeat);
for (DatanodeID dn : nodeList) {
nodeManager.sendHeartbeat(dn, null);
}
// 6 seconds is the dead window for this test , so we wait a total of
// 7 seconds to make sure that the node moves into dead state.
@ -408,7 +422,7 @@ public class TestNodeManager {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
nodeManager.sendHeartbeat(null);
nodeManager.sendHeartbeat(null, null);
logCapturer.stopCapturing();
assertThat(logCapturer.getOutput(), containsString("Datanode ID in " +
"heartbeat is null"));
@ -484,9 +498,9 @@ public class TestNodeManager {
SCMTestUtils.getDatanodeID(nodeManager, "StaleNode");
DatanodeID deadNode =
SCMTestUtils.getDatanodeID(nodeManager, "DeadNode");
nodeManager.sendHeartbeat(healthyNode);
nodeManager.sendHeartbeat(staleNode);
nodeManager.sendHeartbeat(deadNode);
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(staleNode, null);
nodeManager.sendHeartbeat(deadNode, null);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(500);
@ -512,12 +526,12 @@ public class TestNodeManager {
* the 3 second windows.
*/
nodeManager.sendHeartbeat(healthyNode);
nodeManager.sendHeartbeat(staleNode);
nodeManager.sendHeartbeat(deadNode);
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(staleNode, null);
nodeManager.sendHeartbeat(deadNode, null);
Thread.sleep(1500);
nodeManager.sendHeartbeat(healthyNode);
nodeManager.sendHeartbeat(healthyNode, null);
Thread.sleep(2 * 1000);
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
@ -537,10 +551,10 @@ public class TestNodeManager {
* staleNode to move to stale state and deadNode to move to dead state.
*/
nodeManager.sendHeartbeat(healthyNode);
nodeManager.sendHeartbeat(staleNode);
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(staleNode, null);
Thread.sleep(1500);
nodeManager.sendHeartbeat(healthyNode);
nodeManager.sendHeartbeat(healthyNode, null);
Thread.sleep(2 * 1000);
// 3.5 seconds have elapsed for stale node, so it moves into Stale.
@ -570,9 +584,9 @@ public class TestNodeManager {
* Cluster State : let us heartbeat all the nodes and verify that we get
* back all the nodes in healthy state.
*/
nodeManager.sendHeartbeat(healthyNode);
nodeManager.sendHeartbeat(staleNode);
nodeManager.sendHeartbeat(deadNode);
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(staleNode, null);
nodeManager.sendHeartbeat(deadNode, null);
Thread.sleep(500);
//Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size());
@ -591,8 +605,9 @@ public class TestNodeManager {
private void heartbeatNodeSet(SCMNodeManager manager, List<DatanodeID> list,
int sleepDuration) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
list.forEach(manager::sendHeartbeat);
Thread.sleep(sleepDuration);
for (DatanodeID dn : list) {
manager.sendHeartbeat(dn, null);
} Thread.sleep(sleepDuration);
}
}
@ -676,7 +691,10 @@ public class TestNodeManager {
// No Thread just one time HBs the node manager, so that these will be
// marked as dead nodes eventually.
deadNodeList.forEach(nodeManager::sendHeartbeat);
for (DatanodeID dn : deadNodeList) {
nodeManager.sendHeartbeat(dn, null);
}
Thread thread1 = new Thread(healthyNodeTask);
thread1.setDaemon(true);
@ -828,7 +846,7 @@ public class TestNodeManager {
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
nodeManager.setMinimumChillModeNodes(10);
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID);
nodeManager.sendHeartbeat(datanodeID, null);
String status = nodeManager.getChillModeStatus();
Assert.assertThat(status, CoreMatchers.containsString("Still in chill " +
"mode. Waiting on nodes to report in."));
@ -858,7 +876,7 @@ public class TestNodeManager {
// Assert that node manager force enter cannot be overridden by nodes HBs.
for(int x= 0; x < 20; x++) {
DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanode);
nodeManager.sendHeartbeat(datanode, null);
}
Thread.sleep(500);
@ -873,6 +891,147 @@ public class TestNodeManager {
CoreMatchers.containsString("Out of chill mode."));
assertFalse(nodeManager.isInManualChillMode());
}
}
/**
* Test multiple nodes sending initial heartbeat with their node report.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmStatsFromNodeReport() throws IOException,
InterruptedException, TimeoutException {
Configuration conf = getConf();
final int nodeCount = 10;
final long capacity = 2000;
final long used = 100;
final long remaining = capacity - used;
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
for (int x = 0; x < nodeCount; x++) {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
srb.setCapacity(capacity).setScmUsed(used).
setRemaining(capacity - used).build();
nodeManager.sendHeartbeat(datanodeID,
nrb.addStorageReport(srb).build());
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
assertEquals(capacity * nodeCount,
nodeManager.getStats().getCapacity());
assertEquals(used * nodeCount,
nodeManager.getStats().getScmUsed());
assertEquals(remaining * nodeCount,
nodeManager.getStats().getRemaining());
}
}
/**
* Test single node stat update based on nodereport from different heartbeat
* status (healthy, stale and dead).
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmNodeReportUpdate() throws IOException,
InterruptedException, TimeoutException {
Configuration conf = getConf();
final int heartbeatCount = 5;
final int nodeCount = 1;
final int interval = 100;
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
final long capacity = 2000;
final long usedPerHeartbeat = 100;
for (int x = 0; x < heartbeatCount; x++) {
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
srb.setCapacity(capacity).setScmUsed(x * usedPerHeartbeat)
.setRemaining(capacity - x * usedPerHeartbeat).build();
nrb.addStorageReport(srb);
nodeManager.sendHeartbeat(datanodeID, nrb.build());
Thread.sleep(100);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount -1);
final long expectedRemaining = capacity -
usedPerHeartbeat * (heartbeatCount - 1);
assertEquals(capacity, nodeManager.getStats().getCapacity());
assertEquals(expectedScmUsed, nodeManager.getStats().getScmUsed());
assertEquals(expectedRemaining, nodeManager.getStats().getRemaining());
// Test NodeManager#getNodeStats
assertEquals(nodeCount, nodeManager.getNodeStats().size());
assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
assertEquals(expectedScmUsed,
nodeManager.getNodeStats().get(0).getScmUsed());
assertEquals(expectedRemaining,
nodeManager.getNodeStats().get(0).getRemaining());
// Wait up to 4s so that the node becomes stale
// Verify the usage info should be unchanged.
GenericTestUtils.waitFor(
() -> nodeManager.getNodeCount(NodeManager.NODESTATE.STALE) == 1, 100,
4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeStats().size());
assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
assertEquals(expectedScmUsed,
nodeManager.getNodeStats().get(0).getScmUsed());
assertEquals(expectedRemaining,
nodeManager.getNodeStats().get(0).getRemaining());
// Wait up to 3 more seconds so the node becomes dead
// Verify usage info should be updated.
GenericTestUtils.waitFor(
() -> nodeManager.getNodeCount(NodeManager.NODESTATE.DEAD) == 1, 100,
3 * 1000);
assertEquals(0, nodeManager.getNodeStats().size());
assertEquals(0, nodeManager.getStats().getCapacity());
assertEquals(0, nodeManager.getStats().getScmUsed());
assertEquals(0, nodeManager.getStats().getRemaining());
// Send a new report to bring the dead node back to healty
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
srb.setCapacity(capacity).setScmUsed(expectedScmUsed)
.setRemaining(expectedRemaining).build();
nrb.addStorageReport(srb);
nodeManager.sendHeartbeat(datanodeID, nrb.build());
// Wait up to 5 seconds so that the dead node becomes healthy
// Verify usage info should be updated.
GenericTestUtils.waitFor(
() -> nodeManager.getNodeCount(NodeManager.NODESTATE.HEALTHY) == 1,
100, 5 * 1000);
assertEquals(nodeCount, nodeManager.getNodeStats().size());
assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
assertEquals(expectedScmUsed,
nodeManager.getNodeStats().get(0).getScmUsed());
assertEquals(expectedRemaining,
nodeManager.getNodeStats().get(0).getRemaining());
}
}
}