HDDS-79. Remove ReportState from SCMHeartbeatRequestProto. Contributed by Nanda kumar.

This commit is contained in:
Xiaoyu Yao 2018-05-22 15:46:59 -07:00
parent 43be9ab44f
commit 68c7fd8e60
24 changed files with 63 additions and 305 deletions

View File

@ -34,8 +34,6 @@
.StorageContainerDatanodeProtocolProtos; .StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -1072,16 +1070,8 @@ public long getBytesUsed(long containerId) {
@Override @Override
public long getNumKeys(long containerId) { public long getNumKeys(long containerId) {
ContainerData cData = containerMap.get(containerId); ContainerData cData = containerMap.get(containerId);
return cData.getKeyCount(); } return cData.getKeyCount();
/**
* Get the container report state to send via HB to SCM.
*
* @return container report state.
*/
@Override
public ReportState getContainerReportState() {
return containerReportManager.getContainerReportState();
} }
} }

View File

@ -19,15 +19,12 @@
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces import org.apache.hadoop.ozone.container.common.interfaces
.ContainerReportManager; .ContainerReportManager;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval; import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
@ -40,15 +37,9 @@ public class ContainerReportManagerImpl implements ContainerReportManager {
private long lastContainerReportTime; private long lastContainerReportTime;
private final long containerReportInterval; private final long containerReportInterval;
private final long heartbeatInterval; private final long heartbeatInterval;
private AtomicLong reportCount;
private static final ReportState NO_CONTAINER_REPORTSTATE =
ReportState.newBuilder()
.setState(ReportState.states.noContainerReports)
.setCount(0).build();
public ContainerReportManagerImpl(Configuration config) { public ContainerReportManagerImpl(Configuration config) {
this.lastContainerReportTime = -1; this.lastContainerReportTime = -1;
this.reportCount = new AtomicLong(0L);
this.containerReportInterval = config.getTimeDuration( this.containerReportInterval = config.getTimeDuration(
OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL, OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL,
OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT, OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT,
@ -56,32 +47,18 @@ public ContainerReportManagerImpl(Configuration config) {
this.heartbeatInterval = getScmHeartbeatInterval(config); this.heartbeatInterval = getScmHeartbeatInterval(config);
} }
public ReportState getContainerReportState() { public boolean shouldSendContainerReport() {
if (lastContainerReportTime < 0) { if (lastContainerReportTime < 0) {
return getFullContainerReportState(); return true;
} else { }
// Add a random delay (0~30s) on top of the container report // Add a random delay (0~30s) on top of the container report
// interval (60s) so tha the SCM is overwhelmed by the container reports // interval (60s) so tha the SCM is overwhelmed by the container reports
// sent in sync. // sent in sync.
if (Time.monotonicNow() - lastContainerReportTime > if (Time.monotonicNow() - lastContainerReportTime >
(containerReportInterval + getRandomReportDelay())) { (containerReportInterval + getRandomReportDelay())) {
return getFullContainerReportState(); return true;
} else {
return getNoContainerReportState();
} }
} return false;
}
private ReportState getFullContainerReportState() {
ReportState.Builder rsBuilder = ReportState.newBuilder();
rsBuilder.setState(ReportState.states.completeContinerReport);
rsBuilder.setCount(reportCount.incrementAndGet());
this.lastContainerReportTime = Time.monotonicNow();
return rsBuilder.build();
}
private ReportState getNoContainerReportState() {
return NO_CONTAINER_REPORTSTATE;
} }
private long getRandomReportDelay() { private long getRandomReportDelay() {

View File

@ -28,8 +28,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@ -266,9 +264,4 @@ void closeContainer(long containerID)
*/ */
long getNumKeys(long containerId); long getNumKeys(long containerId);
/**
* Get the container report state to send via HB to SCM.
* @return container report state.
*/
ReportState getContainerReportState();
} }

View File

@ -17,16 +17,14 @@
*/ */
package org.apache.hadoop.ozone.container.common.interfaces; package org.apache.hadoop.ozone.container.common.interfaces;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
/** /**
* Interface for container report manager operations. * Interface for container report manager operations.
*/ */
public interface ContainerReportManager { public interface ContainerReportManager {
/** /**
* Get the container report state. * Check if we have to send container report.
* @return the container report state. * @return true if container report has to be sent.
*/ */
ReportState getContainerReportState(); boolean shouldSendContainerReport();
} }

View File

@ -135,7 +135,6 @@ private void start() throws IOException {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
nextHB.set(Time.monotonicNow() + heartbeatFrequency); nextHB.set(Time.monotonicNow() + heartbeatFrequency);
context.setReportState(container.getNodeReport()); context.setReportState(container.getNodeReport());
context.setContainerReportState(container.getContainerReportState());
context.execute(executorService, heartbeatFrequency, context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
now = Time.monotonicNow(); now = Time.monotonicNow();

View File

@ -17,8 +17,6 @@
package org.apache.hadoop.ozone.container.common.statemachine; package org.apache.hadoop.ozone.container.common.statemachine;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.DatanodeState;
@ -40,9 +38,6 @@
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
/** /**
@ -58,9 +53,6 @@ public class StateContext {
private final Configuration conf; private final Configuration conf;
private DatanodeStateMachine.DatanodeStates state; private DatanodeStateMachine.DatanodeStates state;
private SCMNodeReport nrState; private SCMNodeReport nrState;
private ReportState reportState;
private static final ReportState DEFAULT_REPORT_STATE =
ReportState.newBuilder().setState(noContainerReports).setCount(0).build();
/** /**
* Constructs a StateContext. * Constructs a StateContext.
@ -212,7 +204,6 @@ public void execute(ExecutorService service, long time, TimeUnit unit)
if (isExiting(newState)) { if (isExiting(newState)) {
task.onExit(); task.onExit();
} }
this.clearReportState();
this.setState(newState); this.setState(newState);
} }
} }
@ -253,33 +244,4 @@ public long getExecutionCount() {
return stateExecutionCount.get(); return stateExecutionCount.get();
} }
/**
* Gets the ReportState.
* @return ReportState.
*/
public synchronized ReportState getContainerReportState() {
if (reportState == null) {
return DEFAULT_REPORT_STATE;
}
return reportState;
}
/**
* Sets the ReportState.
* @param rState - ReportState.
*/
public synchronized void setContainerReportState(ReportState rState) {
this.reportState = rState;
}
/**
* Clears report state after it has been communicated.
*/
public synchronized void clearReportState() {
if(reportState != null) {
setContainerReportState(null);
}
}
} }

View File

@ -99,8 +99,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
Preconditions.checkState(this.datanodeDetailsProto != null); Preconditions.checkState(this.datanodeDetailsProto != null);
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
.sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport(), .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport());
this.context.getContainerReportState());
processResponse(reponse, datanodeDetailsProto); processResponse(reponse, datanodeDetailsProto);
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount(); rpcEndpoint.zeroMissedCount();

View File

@ -24,8 +24,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -267,11 +265,4 @@ public ContainerManager getContainerManager() {
return this.manager; return this.manager;
} }
/**
* Get the container report state to send via HB to SCM.
* @return the container report state.
*/
public ReportState getContainerReportState() {
return this.manager.getContainerReportState();
}
} }

View File

@ -27,8 +27,6 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -59,12 +57,11 @@ SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest)
* Used by data node to send a Heartbeat. * Used by data node to send a Heartbeat.
* @param datanodeDetails - Datanode Details. * @param datanodeDetails - Datanode Details.
* @param nodeReport - node report state * @param nodeReport - node report state
* @param reportState - container report state.
* @return - SCMHeartbeatResponseProto * @return - SCMHeartbeatResponseProto
* @throws IOException * @throws IOException
*/ */
SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails, SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails,
SCMNodeReport nodeReport, ReportState reportState) throws IOException; SCMNodeReport nodeReport) throws IOException;
/** /**
* Register Datanode. * Register Datanode.

View File

@ -19,8 +19,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -61,10 +59,9 @@ SCMCommand register(DatanodeDetailsProto datanodeDetails, SCMNodeReport
* Send heartbeat to indicate the datanode is alive and doing well. * Send heartbeat to indicate the datanode is alive and doing well.
* @param datanodeDetails - Datanode ID. * @param datanodeDetails - Datanode ID.
* @param nodeReport - node report. * @param nodeReport - node report.
* @param reportState - container report.
* @return SCMheartbeat response list * @return SCMheartbeat response list
*/ */
List<SCMCommand> sendHeartbeat(DatanodeDetailsProto datanodeDetails, List<SCMCommand> sendHeartbeat(DatanodeDetailsProto datanodeDetails,
SCMNodeReport nodeReport, ReportState reportState); SCMNodeReport nodeReport);
} }

View File

@ -28,8 +28,6 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -133,12 +131,11 @@ public SCMVersionResponseProto getVersion(SCMVersionRequestProto
@Override @Override
public SCMHeartbeatResponseProto sendHeartbeat( public SCMHeartbeatResponseProto sendHeartbeat(
DatanodeDetailsProto datanodeDetailsProto, DatanodeDetailsProto datanodeDetailsProto,
SCMNodeReport nodeReport, ReportState reportState) throws IOException { SCMNodeReport nodeReport) throws IOException {
SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
.newBuilder(); .newBuilder();
req.setDatanodeDetails(datanodeDetailsProto); req.setDatanodeDetails(datanodeDetailsProto);
req.setNodeReport(nodeReport); req.setNodeReport(nodeReport);
req.setContainerReportState(reportState);
final SCMHeartbeatResponseProto resp; final SCMHeartbeatResponseProto resp;
try { try {
resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build()); resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());

View File

@ -88,8 +88,7 @@ public StorageContainerDatanodeProtocolServerSideTranslatorPB(
SCMHeartbeatRequestProto request) throws ServiceException { SCMHeartbeatRequestProto request) throws ServiceException {
try { try {
return impl.sendHeartbeat(request.getDatanodeDetails(), return impl.sendHeartbeat(request.getDatanodeDetails(),
request.getNodeReport(), request.getNodeReport());
request.getContainerReportState());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }

View File

@ -42,45 +42,6 @@ import "hdds.proto";
message SCMHeartbeatRequestProto { message SCMHeartbeatRequestProto {
required DatanodeDetailsProto datanodeDetails = 1; required DatanodeDetailsProto datanodeDetails = 1;
optional SCMNodeReport nodeReport = 2; optional SCMNodeReport nodeReport = 2;
optional ReportState containerReportState = 3;
}
enum DatanodeContainerState {
closed = 0;
open = 1;
}
/**
NodeState contains messages from datanode to SCM saying that it has
some information that SCM might be interested in.*/
message ReportState {
enum states {
noContainerReports = 0;
completeContinerReport = 1;
deltaContainerReport = 2;
}
required states state = 1;
required int64 count = 2 [default = 0];
}
/**
This message is used to persist the information about a container in the
SCM database, This information allows SCM to startup faster and avoid having
all container info in memory all the time.
*/
message ContainerPersistanceProto {
required DatanodeContainerState state = 1;
required hadoop.hdds.Pipeline pipeline = 2;
required ContainerInfo info = 3;
}
/**
This message is used to do a quick look up of which containers are effected
if a node goes down
*/
message NodeContianerMapping {
repeated string contianerName = 1;
} }
/** /**

View File

@ -29,8 +29,6 @@
.ContainerBlocksDeletionACKResponseProto; .ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo; .StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -53,7 +51,6 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
private int rpcResponseDelay; private int rpcResponseDelay;
private AtomicInteger heartbeatCount = new AtomicInteger(0); private AtomicInteger heartbeatCount = new AtomicInteger(0);
private AtomicInteger rpcCount = new AtomicInteger(0); private AtomicInteger rpcCount = new AtomicInteger(0);
private ReportState reportState;
private AtomicInteger containerReportsCount = new AtomicInteger(0); private AtomicInteger containerReportsCount = new AtomicInteger(0);
// Map of datanode to containers // Map of datanode to containers
@ -177,11 +174,10 @@ private void sleepIfNeeded() {
@Override @Override
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
sendHeartbeat(DatanodeDetailsProto datanodeDetailsProto, sendHeartbeat(DatanodeDetailsProto datanodeDetailsProto,
SCMNodeReport nodeReport, ReportState scmReportState) SCMNodeReport nodeReport)
throws IOException { throws IOException {
rpcCount.incrementAndGet(); rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet(); heartbeatCount.incrementAndGet();
this.reportState = scmReportState;
sleepIfNeeded(); sleepIfNeeded();
List<SCMCommandResponseProto> List<SCMCommandResponseProto>
cmdResponses = new LinkedList<>(); cmdResponses = new LinkedList<>();
@ -298,19 +294,12 @@ public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
.newBuilder().getDefaultInstanceForType(); .newBuilder().getDefaultInstanceForType();
} }
public ReportState getReportState() {
return this.reportState;
}
/** /**
* Reset the mock Scm for test to get a fresh start without rebuild MockScm. * Reset the mock Scm for test to get a fresh start without rebuild MockScm.
*/ */
public void reset() { public void reset() {
heartbeatCount.set(0); heartbeatCount.set(0);
rpcCount.set(0); rpcCount.set(0);
reportState = ReportState.newBuilder()
.setState(ReportState.states.noContainerReports)
.setCount(0).build();
containerReportsCount.set(0); containerReportsCount.set(0);
nodeContainers.clear(); nodeContainers.clear();

View File

@ -21,8 +21,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@ -305,11 +303,6 @@ public void testDatanodeStateContext() throws IOException,
for (ScmTestMock mock : mockServers) { for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getHeartbeatCount()); Assert.assertEquals(1, mock.getHeartbeatCount());
// Assert that heartbeat did indeed carry that State that we said
// have in the datanode.
Assert.assertEquals(mock.getReportState().getState().getNumber(),
StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports.getNumber());
} }
} }
} }

View File

@ -20,8 +20,6 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
@ -34,21 +32,18 @@ public class HeartbeatQueueItem {
private DatanodeDetails datanodeDetails; private DatanodeDetails datanodeDetails;
private long recvTimestamp; private long recvTimestamp;
private SCMNodeReport nodeReport; private SCMNodeReport nodeReport;
private ReportState containerReportState;
/** /**
* *
* @param datanodeDetails - datanode ID of the heartbeat. * @param datanodeDetails - datanode ID of the heartbeat.
* @param recvTimestamp - heartbeat receive timestamp. * @param recvTimestamp - heartbeat receive timestamp.
* @param nodeReport - node report associated with the heartbeat if any. * @param nodeReport - node report associated with the heartbeat if any.
* @param containerReportState - container report state.
*/ */
HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp, HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp,
SCMNodeReport nodeReport, ReportState containerReportState) { SCMNodeReport nodeReport) {
this.datanodeDetails = datanodeDetails; this.datanodeDetails = datanodeDetails;
this.recvTimestamp = recvTimestamp; this.recvTimestamp = recvTimestamp;
this.nodeReport = nodeReport; this.nodeReport = nodeReport;
this.containerReportState = containerReportState;
} }
/** /**
@ -65,13 +60,6 @@ public SCMNodeReport getNodeReport() {
return nodeReport; return nodeReport;
} }
/**
* @return container report state.
*/
public ReportState getContainerReportState() {
return containerReportState;
}
/** /**
* @return heartbeat receive timestamp. * @return heartbeat receive timestamp.
*/ */
@ -85,7 +73,6 @@ public long getRecvTimestamp() {
public static class Builder { public static class Builder {
private DatanodeDetails datanodeDetails; private DatanodeDetails datanodeDetails;
private SCMNodeReport nodeReport; private SCMNodeReport nodeReport;
private ReportState containerReportState;
private long recvTimestamp = monotonicNow(); private long recvTimestamp = monotonicNow();
public Builder setDatanodeDetails(DatanodeDetails dnDetails) { public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
@ -98,11 +85,6 @@ public Builder setNodeReport(SCMNodeReport scmNodeReport) {
return this; return this;
} }
public Builder setContainerReportState(ReportState crs) {
this.containerReportState = crs;
return this;
}
@VisibleForTesting @VisibleForTesting
public Builder setRecvTimestamp(long recvTime) { public Builder setRecvTimestamp(long recvTime) {
this.recvTimestamp = recvTime; this.recvTimestamp = recvTime;
@ -110,8 +92,7 @@ public Builder setRecvTimestamp(long recvTime) {
} }
public HeartbeatQueueItem build() { public HeartbeatQueueItem build() {
return new HeartbeatQueueItem(datanodeDetails, recvTimestamp, nodeReport, return new HeartbeatQueueItem(datanodeDetails, recvTimestamp, nodeReport);
containerReportState);
} }
} }
} }

View File

@ -30,8 +30,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -48,7 +46,6 @@
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -609,8 +606,6 @@ private void handleHeartbeat(HeartbeatQueueItem hbItem) {
if (healthyNodes.containsKey(datanodeUuid)) { if (healthyNodes.containsKey(datanodeUuid)) {
healthyNodes.put(datanodeUuid, processTimestamp); healthyNodes.put(datanodeUuid, processTimestamp);
updateNodeStat(datanodeUuid, nodeReport); updateNodeStat(datanodeUuid, nodeReport);
updateCommandQueue(datanodeUuid,
hbItem.getContainerReportState().getState());
return; return;
} }
@ -622,8 +617,6 @@ private void handleHeartbeat(HeartbeatQueueItem hbItem) {
healthyNodeCount.incrementAndGet(); healthyNodeCount.incrementAndGet();
staleNodeCount.decrementAndGet(); staleNodeCount.decrementAndGet();
updateNodeStat(datanodeUuid, nodeReport); updateNodeStat(datanodeUuid, nodeReport);
updateCommandQueue(datanodeUuid,
hbItem.getContainerReportState().getState());
return; return;
} }
@ -635,8 +628,6 @@ private void handleHeartbeat(HeartbeatQueueItem hbItem) {
deadNodeCount.decrementAndGet(); deadNodeCount.decrementAndGet();
healthyNodeCount.incrementAndGet(); healthyNodeCount.incrementAndGet();
updateNodeStat(datanodeUuid, nodeReport); updateNodeStat(datanodeUuid, nodeReport);
updateCommandQueue(datanodeUuid,
hbItem.getContainerReportState().getState());
return; return;
} }
@ -671,22 +662,6 @@ private void updateNodeStat(UUID dnId, SCMNodeReport nodeReport) {
} }
} }
private void updateCommandQueue(UUID dnId,
ReportState.states containerReportState) {
if (containerReportState != null) {
switch (containerReportState) {
case completeContinerReport:
commandQueue.addCommand(dnId,
SendContainerCommand.newBuilder().build());
return;
case deltaContainerReport:
case noContainerReports:
default:
// do nothing
}
}
}
/** /**
* Closes this stream and releases any system resources associated with it. If * Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect. * the stream is already closed then invoking this method has no effect.
@ -829,14 +804,12 @@ private SCMCommand verifyDatanodeUUID(DatanodeDetails datanodeDetails) {
* *
* @param datanodeDetailsProto - DatanodeDetailsProto. * @param datanodeDetailsProto - DatanodeDetailsProto.
* @param nodeReport - node report. * @param nodeReport - node report.
* @param containerReportState - container report state.
* @return SCMheartbeat response. * @return SCMheartbeat response.
* @throws IOException * @throws IOException
*/ */
@Override @Override
public List<SCMCommand> sendHeartbeat( public List<SCMCommand> sendHeartbeat(
DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport, DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport) {
ReportState containerReportState) {
Preconditions.checkNotNull(datanodeDetailsProto, "Heartbeat is missing " + Preconditions.checkNotNull(datanodeDetailsProto, "Heartbeat is missing " +
"DatanodeDetails."); "DatanodeDetails.");
@ -851,7 +824,6 @@ public List<SCMCommand> sendHeartbeat(
new HeartbeatQueueItem.Builder() new HeartbeatQueueItem.Builder()
.setDatanodeDetails(datanodeDetails) .setDatanodeDetails(datanodeDetails)
.setNodeReport(nodeReport) .setNodeReport(nodeReport)
.setContainerReportState(containerReportState)
.build()); .build());
return commandQueue.getCommand(datanodeDetails.getUuid()); return commandQueue.getCommand(datanodeDetails.getUuid());
} else { } else {

View File

@ -153,12 +153,10 @@ public SCMVersionResponseProto getVersion(SCMVersionRequestProto
@Override @Override
public SCMHeartbeatResponseProto sendHeartbeat( public SCMHeartbeatResponseProto sendHeartbeat(
HddsProtos.DatanodeDetailsProto datanodeDetails, HddsProtos.DatanodeDetailsProto datanodeDetails,
StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport, StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport)
StorageContainerDatanodeProtocolProtos.ReportState reportState)
throws IOException { throws IOException {
List<SCMCommand> commands = List<SCMCommand> commands =
scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport, scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport);
reportState);
List<SCMCommandResponseProto> cmdResponses = new LinkedList<>(); List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
for (SCMCommand cmd : commands) { for (SCMCommand cmd : commands) {
cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid())); cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid()));

View File

@ -23,8 +23,6 @@
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -387,13 +385,12 @@ public SCMCommand register(HddsProtos.DatanodeDetailsProto datanodeDetails,
* *
* @param datanodeDetails - Datanode ID. * @param datanodeDetails - Datanode ID.
* @param nodeReport - node report. * @param nodeReport - node report.
* @param containerReportState - container report state.
* @return SCMheartbeat response list * @return SCMheartbeat response list
*/ */
@Override @Override
public List<SCMCommand> sendHeartbeat( public List<SCMCommand> sendHeartbeat(
HddsProtos.DatanodeDetailsProto datanodeDetails, HddsProtos.DatanodeDetailsProto datanodeDetails,
SCMNodeReport nodeReport, ReportState containerReportState) { SCMNodeReport nodeReport) {
if ((datanodeDetails != null) && (nodeReport != null) && (nodeReport if ((datanodeDetails != null) && (nodeReport != null) && (nodeReport
.getStorageReportCount() > 0)) { .getStorageReportCount() > 0)) {
SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());

View File

@ -26,15 +26,12 @@
import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy; .ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementCapacity; .SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport; .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -70,10 +67,6 @@ public class TestContainerPlacement {
private static XceiverClientManager xceiverClientManager = private static XceiverClientManager xceiverClientManager =
new XceiverClientManager(new OzoneConfiguration()); new XceiverClientManager(new OzoneConfiguration());
private ReportState reportState = ReportState.newBuilder()
.setState(ReportState.states.noContainerReports)
.setCount(0).build();
/** /**
* Returns a new copy of Configuration. * Returns a new copy of Configuration.
* *
@ -143,7 +136,7 @@ public void testContainerPlacementCapacity() throws IOException,
List<SCMStorageReport> reports = TestUtils List<SCMStorageReport> reports = TestUtils
.createStorageReport(capacity, used, remaining, path, null, id, 1); .createStorageReport(capacity, used, remaining, path, null, id, 1);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
TestUtils.createNodeReport(reports), reportState); TestUtils.createNodeReport(reports));
} }
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),

View File

@ -25,10 +25,6 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport; .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -82,10 +78,6 @@ public class TestNodeManager {
private File testDir; private File testDir;
private ReportState reportState = ReportState.newBuilder()
.setState(ReportState.states.noContainerReports)
.setCount(0).build();
@Rule @Rule
public ExpectedException thrown = ExpectedException.none(); public ExpectedException thrown = ExpectedException.none();
@ -153,7 +145,7 @@ public void testScmHeartbeat() throws IOException,
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
nodeManager); nodeManager);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
null, reportState); null);
} }
// Wait for 4 seconds max. // Wait for 4 seconds max.
@ -200,8 +192,7 @@ public void testScmNotEnoughHeartbeats() throws IOException,
// Need 100 nodes to come out of chill mode, only one node is sending HB. // Need 100 nodes to come out of chill mode, only one node is sending HB.
nodeManager.setMinimumChillModeNodes(100); nodeManager.setMinimumChillModeNodes(100);
nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager) nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager)
.getProtoBufMessage(), .getProtoBufMessage(), null);
null, reportState);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000); 100, 4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have" + assertFalse("Not enough heartbeat, Node manager should have" +
@ -229,7 +220,7 @@ public void testScmSameNodeHeartbeats() throws IOException,
// Send 10 heartbeat from same node, and assert we never leave chill mode. // Send 10 heartbeat from same node, and assert we never leave chill mode.
for (int x = 0; x < 10; x++) { for (int x = 0; x < 10; x++) {
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
null, reportState); null);
} }
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
@ -260,7 +251,7 @@ public void testScmShutdown() throws IOException, InterruptedException,
// These should never be processed. // These should never be processed.
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
null, reportState); null);
// Let us just wait for 2 seconds to prove that HBs are not processed. // Let us just wait for 2 seconds to prove that HBs are not processed.
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
@ -289,8 +280,7 @@ public void testScmHeartbeatAfterRestart() throws Exception {
nodemanager.register(datanodeDetails.getProtoBufMessage(), nodemanager.register(datanodeDetails.getProtoBufMessage(),
TestUtils.createNodeReport(reports)); TestUtils.createNodeReport(reports));
List<SCMCommand> command = nodemanager.sendHeartbeat( List<SCMCommand> command = nodemanager.sendHeartbeat(
datanodeDetails.getProtoBufMessage(), datanodeDetails.getProtoBufMessage(), null);
null, reportState);
Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails)); Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
Assert.assertTrue("On regular HB calls, SCM responses a " Assert.assertTrue("On regular HB calls, SCM responses a "
+ "datanode with an empty command list", command.isEmpty()); + "datanode with an empty command list", command.isEmpty());
@ -309,7 +299,7 @@ public void testScmHeartbeatAfterRestart() throws Exception {
@Override public Boolean get() { @Override public Boolean get() {
List<SCMCommand> command = List<SCMCommand> command =
nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
null, reportState); null);
return command.size() == 1 && command.get(0).getType() return command.size() == 1 && command.get(0).getType()
.equals(SCMCmdType.reregisterCommand); .equals(SCMCmdType.reregisterCommand);
} }
@ -341,7 +331,7 @@ public void testScmHealthyNodeCount() throws IOException,
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
nodeManager); nodeManager);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
null, reportState); null);
} }
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000); 100, 4 * 1000);
@ -433,18 +423,18 @@ public void testScmDetectStaleAndDeadNode() throws IOException,
// Heartbeat once // Heartbeat once
nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(), nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(),
null, reportState); null);
// Heartbeat all other nodes. // Heartbeat all other nodes.
for (DatanodeDetails dn : nodeList) { for (DatanodeDetails dn : nodeList) {
nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
} }
// Wait for 2 seconds .. and heartbeat good nodes again. // Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
for (DatanodeDetails dn : nodeList) { for (DatanodeDetails dn : nodeList) {
nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
} }
// Wait for 2 seconds, wait a total of 4 seconds to make sure that the // Wait for 2 seconds, wait a total of 4 seconds to make sure that the
@ -461,7 +451,7 @@ public void testScmDetectStaleAndDeadNode() throws IOException,
// heartbeat good nodes again. // heartbeat good nodes again.
for (DatanodeDetails dn : nodeList) { for (DatanodeDetails dn : nodeList) {
nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
} }
// 6 seconds is the dead window for this test , so we wait a total of // 6 seconds is the dead window for this test , so we wait a total of
@ -497,7 +487,7 @@ public void testScmDetectStaleAndDeadNode() throws IOException,
public void testScmCheckForErrorOnNullDatanodeDetails() throws IOException, public void testScmCheckForErrorOnNullDatanodeDetails() throws IOException,
InterruptedException, TimeoutException { InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) { try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
nodeManager.sendHeartbeat(null, null, reportState); nodeManager.sendHeartbeat(null, null);
} catch (NullPointerException npe) { } catch (NullPointerException npe) {
GenericTestUtils.assertExceptionContains("Heartbeat is missing " + GenericTestUtils.assertExceptionContains("Heartbeat is missing " +
"DatanodeDetails.", npe); "DatanodeDetails.", npe);
@ -575,11 +565,11 @@ public void testScmClusterIsInExpectedState1() throws IOException,
DatanodeDetails deadNode = DatanodeDetails deadNode =
TestUtils.getDatanodeDetails(nodeManager); TestUtils.getDatanodeDetails(nodeManager);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
healthyNode.getProtoBufMessage(), null, reportState); healthyNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
staleNode.getProtoBufMessage(), null, reportState); staleNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
deadNode.getProtoBufMessage(), null, reportState); deadNode.getProtoBufMessage(), null);
// Sleep so that heartbeat processing thread gets to run. // Sleep so that heartbeat processing thread gets to run.
Thread.sleep(500); Thread.sleep(500);
@ -606,15 +596,15 @@ public void testScmClusterIsInExpectedState1() throws IOException,
*/ */
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
healthyNode.getProtoBufMessage(), null, reportState); healthyNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
staleNode.getProtoBufMessage(), null, reportState); staleNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
deadNode.getProtoBufMessage(), null, reportState); deadNode.getProtoBufMessage(), null);
Thread.sleep(1500); Thread.sleep(1500);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
healthyNode.getProtoBufMessage(), null, reportState); healthyNode.getProtoBufMessage(), null);
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
assertEquals(1, nodeManager.getNodeCount(HEALTHY)); assertEquals(1, nodeManager.getNodeCount(HEALTHY));
@ -635,12 +625,12 @@ public void testScmClusterIsInExpectedState1() throws IOException,
*/ */
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
healthyNode.getProtoBufMessage(), null, reportState); healthyNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
staleNode.getProtoBufMessage(), null, reportState); staleNode.getProtoBufMessage(), null);
Thread.sleep(1500); Thread.sleep(1500);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
healthyNode.getProtoBufMessage(), null, reportState); healthyNode.getProtoBufMessage(), null);
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
// 3.5 seconds have elapsed for stale node, so it moves into Stale. // 3.5 seconds have elapsed for stale node, so it moves into Stale.
@ -674,11 +664,11 @@ public void testScmClusterIsInExpectedState1() throws IOException,
* back all the nodes in healthy state. * back all the nodes in healthy state.
*/ */
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
healthyNode.getProtoBufMessage(), null, reportState); healthyNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
staleNode.getProtoBufMessage(), null, reportState); staleNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
deadNode.getProtoBufMessage(), null, reportState); deadNode.getProtoBufMessage(), null);
Thread.sleep(500); Thread.sleep(500);
//Assert all nodes are healthy. //Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size()); assertEquals(3, nodeManager.getAllNodes().size());
@ -699,7 +689,7 @@ private void heartbeatNodeSet(SCMNodeManager manager,
int sleepDuration) throws InterruptedException { int sleepDuration) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
for (DatanodeDetails dn : list) { for (DatanodeDetails dn : list) {
manager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); manager.sendHeartbeat(dn.getProtoBufMessage(), null);
} }
Thread.sleep(sleepDuration); Thread.sleep(sleepDuration);
} }
@ -785,7 +775,7 @@ public void testScmClusterIsInExpectedState2() throws IOException,
// No Thread just one time HBs the node manager, so that these will be // No Thread just one time HBs the node manager, so that these will be
// marked as dead nodes eventually. // marked as dead nodes eventually.
for (DatanodeDetails dn : deadNodeList) { for (DatanodeDetails dn : deadNodeList) {
nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
} }
@ -950,7 +940,7 @@ public void testScmEnterAndExitChillMode() throws IOException,
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
nodeManager); nodeManager);
nodeManager.sendHeartbeat( nodeManager.sendHeartbeat(
datanodeDetails.getProtoBufMessage(), null, reportState); datanodeDetails.getProtoBufMessage(), null);
String status = nodeManager.getChillModeStatus(); String status = nodeManager.getChillModeStatus();
Assert.assertThat(status, containsString("Still in chill " + Assert.assertThat(status, containsString("Still in chill " +
"mode, waiting on nodes to report in.")); "mode, waiting on nodes to report in."));
@ -978,7 +968,7 @@ public void testScmEnterAndExitChillMode() throws IOException,
for (int x = 0; x < 20; x++) { for (int x = 0; x < 20; x++) {
DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager); DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager);
nodeManager.sendHeartbeat(datanode.getProtoBufMessage(), nodeManager.sendHeartbeat(datanode.getProtoBufMessage(),
null, reportState); null);
} }
Thread.sleep(500); Thread.sleep(500);
@ -1023,7 +1013,7 @@ public void testScmStatsFromNodeReport() throws IOException,
.createStorageReport(capacity, used, free, storagePath, .createStorageReport(capacity, used, free, storagePath,
null, dnId, 1); null, dnId, 1);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
TestUtils.createNodeReport(reports), reportState); TestUtils.createNodeReport(reports));
} }
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000); 100, 4 * 1000);
@ -1073,7 +1063,7 @@ public void testScmNodeReportUpdate() throws IOException,
null, dnId, 1); null, dnId, 1);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
TestUtils.createNodeReport(reports), reportState); TestUtils.createNodeReport(reports));
Thread.sleep(100); Thread.sleep(100);
} }
@ -1154,7 +1144,7 @@ public void testScmNodeReportUpdate() throws IOException,
.createStorageReport(capacity, expectedScmUsed, expectedRemaining, .createStorageReport(capacity, expectedScmUsed, expectedRemaining,
storagePath, null, dnId, 1); storagePath, null, dnId, 1);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
TestUtils.createNodeReport(reports), reportState); TestUtils.createNodeReport(reports));
// Wait up to 5 seconds so that the dead node becomes healthy // Wait up to 5 seconds so that the dead node becomes healthy
// Verify usage info should be updated. // Verify usage info should be updated.

View File

@ -71,9 +71,6 @@
import static org.apache.hadoop.hdds.scm.TestUtils.getDatanodeDetails; import static org.apache.hadoop.hdds.scm.TestUtils.getDatanodeDetails;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils import static org.apache.hadoop.ozone.container.common.ContainerTestUtils
.createEndpoint; .createEndpoint;
@ -88,8 +85,6 @@ public class TestEndPoint {
private static RPC.Server scmServer; private static RPC.Server scmServer;
private static ScmTestMock scmServerImpl; private static ScmTestMock scmServerImpl;
private static File testDir; private static File testDir;
private static StorageContainerDatanodeProtocolProtos.ReportState
defaultReportState;
@AfterClass @AfterClass
public static void tearDown() throws Exception { public static void tearDown() throws Exception {
@ -106,9 +101,6 @@ public static void setUp() throws Exception {
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(), scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
scmServerImpl, serverAddress, 10); scmServerImpl, serverAddress, 10);
testDir = PathUtils.getTestDir(TestEndPoint.class); testDir = PathUtils.getTestDir(TestEndPoint.class);
defaultReportState = StorageContainerDatanodeProtocolProtos.
ReportState.newBuilder().setState(noContainerReports).
setCount(0).build();
} }
@Test @Test
@ -305,8 +297,7 @@ public void testHeartbeat() throws Exception {
String storageId = UUID.randomUUID().toString(); String storageId = UUID.randomUUID().toString();
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
.sendHeartbeat(dataNode.getProtoBufMessage(), .sendHeartbeat(dataNode.getProtoBufMessage(),
TestUtils.createNodeReport(getStorageReports(storageId)), TestUtils.createNodeReport(getStorageReports(storageId)));
defaultReportState);
Assert.assertNotNull(responseProto); Assert.assertNotNull(responseProto);
Assert.assertEquals(0, responseProto.getCommandsCount()); Assert.assertEquals(0, responseProto.getCommandsCount());
} }

View File

@ -26,8 +26,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -293,12 +291,11 @@ public SCMCommand register(HddsProtos.DatanodeDetailsProto dd,
* *
* @param dd - Datanode Details. * @param dd - Datanode Details.
* @param nodeReport - node report. * @param nodeReport - node report.
* @param containerReportState - container report state.
* @return SCMheartbeat response list * @return SCMheartbeat response list
*/ */
@Override @Override
public List<SCMCommand> sendHeartbeat(HddsProtos.DatanodeDetailsProto dd, public List<SCMCommand> sendHeartbeat(HddsProtos.DatanodeDetailsProto dd,
SCMNodeReport nodeReport, ReportState containerReportState) { SCMNodeReport nodeReport) {
return null; return null;
} }

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption; import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
@ -302,11 +301,9 @@ public void testBlockDeletingThrottling() throws Exception {
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> {
NodeManager nodeManager = cluster.getStorageContainerManager() NodeManager nodeManager = cluster.getStorageContainerManager()
.getScmNodeManager(); .getScmNodeManager();
ReportState reportState = ReportState.newBuilder()
.setState(ReportState.states.noContainerReports).setCount(0).build();
List<SCMCommand> commands = nodeManager.sendHeartbeat( List<SCMCommand> commands = nodeManager.sendHeartbeat(
nodeManager.getNodes(NodeState.HEALTHY).get(0).getProtoBufMessage(), nodeManager.getNodes(NodeState.HEALTHY).get(0).getProtoBufMessage(),
null, reportState); null);
if (commands != null) { if (commands != null) {
for (SCMCommand cmd : commands) { for (SCMCommand cmd : commands) {