diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java index 9355364eac7..af470153244 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java @@ -35,11 +35,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -854,11 +854,11 @@ public class ContainerManagerImpl implements ContainerManager { * @return node report. */ @Override - public SCMNodeReport getNodeReport() throws IOException { + public NodeReportProto getNodeReport() throws IOException { StorageLocationReport[] reports = locationManager.getLocationReport(); - SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + NodeReportProto.Builder nrb = NodeReportProto.newBuilder(); for (int i = 0; i < reports.length; i++) { - SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + StorageReportProto.Builder srb = StorageReportProto.newBuilder(); nrb.addStorageReport(reports[i].getProtoBufMessage()); } return nrb.build(); @@ -891,7 +891,7 @@ public class ContainerManagerImpl implements ContainerManager { * @throws IOException */ @Override - public ContainerReportsRequestProto getContainerReport() throws IOException { + public ContainerReportsProto getContainerReport() throws IOException { LOG.debug("Starting container report iteration."); // No need for locking since containerMap is a ConcurrentSkipListMap // And we can never get the exact state since close might happen @@ -899,12 +899,8 @@ public class ContainerManagerImpl implements ContainerManager { List containers = containerMap.values().stream() .collect(Collectors.toList()); - ContainerReportsRequestProto.Builder crBuilder = - ContainerReportsRequestProto.newBuilder(); - - // TODO: support delta based container report - crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setType(ContainerReportsRequestProto.reportType.fullReport); + ContainerReportsProto.Builder crBuilder = + ContainerReportsProto.newBuilder(); for (ContainerData container: containers) { long containerId = container.getContainerID(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java index a5ad6c2b5e7..87b965679d6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java @@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMStorageReport; + StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.StorageTypeProto; @@ -137,8 +137,8 @@ public class StorageLocationReport { * @return SCMStorageReport * @throws IOException In case, the storage type specified is invalid. */ - public SCMStorageReport getProtoBufMessage() throws IOException{ - SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + public StorageReportProto getProtoBufMessage() throws IOException{ + StorageReportProto.Builder srb = StorageReportProto.newBuilder(); return srb.setStorageUuid(getId()) .setCapacity(getCapacity()) .setScmUsed(getScmUsed()) @@ -156,7 +156,7 @@ public class StorageLocationReport { * @throws IOException in case of invalid storage type */ - public static StorageLocationReport getFromProtobuf(SCMStorageReport report) + public static StorageLocationReport getFromProtobuf(StorageReportProto report) throws IOException { StorageLocationReport.Builder builder = StorageLocationReport.newBuilder(); builder.setId(report.getStorageUuid()) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java index ba70953710a..49b68dc2a04 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -27,9 +27,9 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import java.io.IOException; @@ -171,14 +171,14 @@ public interface ContainerManager extends RwLock { * Get the Node Report of container storage usage. * @return node report. */ - SCMNodeReport getNodeReport() throws IOException; + NodeReportProto getNodeReport() throws IOException; /** * Gets container report. * @return container report. * @throws IOException */ - ContainerReportsRequestProto getContainerReport() throws IOException; + ContainerReportsProto getContainerReport() throws IOException; /** * Gets container reports. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index a8fe4949ae4..d0a4217245e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -21,8 +21,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.container.common.statemachine.commandhandler - .CloseContainerHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .CommandDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler @@ -86,7 +85,7 @@ public class DatanodeStateMachine implements Closeable { // When we add new handlers just adding a new handler here should do the // trick. commandDispatcher = CommandDispatcher.newBuilder() - .addHandler(new CloseContainerHandler()) + .addHandler(new CloseContainerCommandHandler()) .addHandler(new DeleteBlocksCommandHandler( container.getContainerManager(), conf)) .setConnectionManager(connectionManager) @@ -131,7 +130,7 @@ public class DatanodeStateMachine implements Closeable { try { LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); nextHB.set(Time.monotonicNow() + heartbeatFrequency); - context.setReportState(container.getNodeReport()); + context.setNodeReport(container.getNodeReport()); context.execute(executorService, heartbeatFrequency, TimeUnit.MILLISECONDS); now = Time.monotonicNow(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 27eb57e456b..4e3c610f778 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.container.common.statemachine; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .InitDatanodeState; @@ -52,7 +52,7 @@ public class StateContext { private final AtomicLong stateExecutionCount; private final Configuration conf; private DatanodeStateMachine.DatanodeStates state; - private SCMNodeReport nrState; + private NodeReportProto dnReport; /** * Constructs a StateContext. @@ -69,7 +69,7 @@ public class StateContext { commandQueue = new LinkedList<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); - nrState = SCMNodeReport.getDefaultInstance(); + dnReport = NodeReportProto.getDefaultInstance(); } /** @@ -144,16 +144,16 @@ public class StateContext { * Returns the node report of the datanode state context. * @return the node report. */ - public SCMNodeReport getNodeReport() { - return nrState; + public NodeReportProto getNodeReport() { + return dnReport; } /** * Sets the storage location report of the datanode state context. - * @param nrReport - node report + * @param nodeReport node report */ - public void setReportState(SCMNodeReport nrReport) { - this.nrState = nrReport; + public void setNodeReport(NodeReportProto nodeReport) { + this.dnReport = nodeReport; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java similarity index 85% rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java rename to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index d8adc7df0f6..e8c602d1ad3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -17,9 +17,9 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -30,18 +30,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Container Report handler. + * Handler for close container command received from SCM. */ -public class CloseContainerHandler implements CommandHandler { +public class CloseContainerCommandHandler implements CommandHandler { static final Logger LOG = - LoggerFactory.getLogger(CloseContainerHandler.class); + LoggerFactory.getLogger(CloseContainerCommandHandler.class); private int invocationCount; private long totalTime; /** * Constructs a ContainerReport handler. */ - public CloseContainerHandler() { + public CloseContainerCommandHandler() { } /** @@ -62,9 +62,9 @@ public class CloseContainerHandler implements CommandHandler { long containerID = -1; try { - SCMCloseContainerCmdResponseProto + CloseContainerCommandProto closeContainerProto = - SCMCloseContainerCmdResponseProto + CloseContainerCommandProto .parseFrom(command.getProtoBufMessage()); containerID = closeContainerProto.getContainerID(); @@ -84,8 +84,8 @@ public class CloseContainerHandler implements CommandHandler { * @return Type */ @Override - public SCMCmdType getCommandType() { - return SCMCmdType.closeContainerCommand; + public SCMCommandProto.Type getCommandType() { + return SCMCommandProto.Type.closeContainerCommand; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index 40feca32bd0..aedd78fe4bd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -18,7 +18,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -38,7 +39,7 @@ public final class CommandDispatcher { static final Logger LOG = LoggerFactory.getLogger(CommandDispatcher.class); private final StateContext context; - private final Map handlerMap; + private final Map handlerMap; private final OzoneContainer container; private final SCMConnectionManager connectionManager; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java index 13d9f7295d9..60e2dc479da 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -17,8 +17,10 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -42,7 +44,7 @@ public interface CommandHandler { * Returns the command type that this command handler handles. * @return Type */ - SCMCmdType getCommandType(); + SCMCommandProto.Type getCommandType(); /** * Returns number of times this handler has been invoked. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 5231660b6d1..ab69bdc38a6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; @@ -26,8 +28,6 @@ import org.apache.hadoop.hdds.protocol.proto .DeleteBlockTransactionResult; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers @@ -73,10 +73,10 @@ public class DeleteBlocksCommandHandler implements CommandHandler { @Override public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { - if (command.getType() != SCMCmdType.deleteBlocksCommand) { + if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) { LOG.warn("Skipping handling command, expected command " + "type {} but found {}", - SCMCmdType.deleteBlocksCommand, command.getType()); + SCMCommandProto.Type.deleteBlocksCommand, command.getType()); return; } LOG.debug("Processing block deletion command."); @@ -193,8 +193,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler { } @Override - public SCMCmdType getCommandType() { - return SCMCmdType.deleteBlocksCommand; + public SCMCommandProto.Type getCommandType() { + return SCMCommandProto.Type.deleteBlocksCommand; } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 01b4c72428b..337cdfbcf8b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.container.common.helpers @@ -97,8 +99,13 @@ public class HeartbeatEndpointTask try { Preconditions.checkState(this.datanodeDetailsProto != null); + SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetailsProto) + .setNodeReport(context.getNodeReport()) + .build(); + SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() - .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport()); + .sendHeartbeat(request); processResponse(reponse, datanodeDetailsProto); rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); rpcEndpoint.zeroMissedCount(); @@ -125,13 +132,13 @@ public class HeartbeatEndpointTask */ private void processResponse(SCMHeartbeatResponseProto response, final DatanodeDetailsProto datanodeDetails) { - for (SCMCommandResponseProto commandResponseProto : response + Preconditions.checkState(response.getDatanodeUUID() + .equalsIgnoreCase(datanodeDetails.getUuid()), + "Unexpected datanode ID in the response."); + // Verify the response is indeed for this datanode. + for (SCMCommandProto commandResponseProto : response .getCommandsList()) { - // Verify the response is indeed for this datanode. - Preconditions.checkState(commandResponseProto.getDatanodeUUID() - .equalsIgnoreCase(datanodeDetails.getUuid()), - "Unexpected datanode ID in the response."); - switch (commandResponseProto.getCmdType()) { + switch (commandResponseProto.getCommandType()) { case reregisterCommand: if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) { if (LOG.isDebugEnabled()) { @@ -148,7 +155,8 @@ public class HeartbeatEndpointTask break; case deleteBlocksCommand: DeleteBlocksCommand db = DeleteBlocksCommand - .getFromProtobuf(commandResponseProto.getDeleteBlocksProto()); + .getFromProtobuf( + commandResponseProto.getDeleteBlocksCommandProto()); if (!db.blocksTobeDeleted().isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug(DeletedContainerBlocksSummary @@ -161,7 +169,7 @@ public class HeartbeatEndpointTask case closeContainerCommand: CloseContainerCommand closeContainer = CloseContainerCommand.getFromProtobuf( - commandResponseProto.getCloseContainerProto()); + commandResponseProto.getCloseContainerCommandProto()); if (LOG.isDebugEnabled()) { LOG.debug("Received SCM container close request for container {}", closeContainer.getContainerID()); @@ -170,7 +178,7 @@ public class HeartbeatEndpointTask break; default: throw new IllegalArgumentException("Unknown response : " - + commandResponseProto.getCmdType().name()); + + commandResponseProto.getCommandType().name()); } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index 77a70843a87..12b48abb10f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -24,11 +24,11 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,11 +104,11 @@ public final class RegisterEndpointTask implements rpcEndPoint.lock(); try { - ContainerReportsRequestProto contianerReport = datanodeContainerManager + ContainerReportsProto contianerReport = datanodeContainerManager .getContainerReport(); - SCMNodeReport nodeReport = datanodeContainerManager.getNodeReport(); + NodeReportProto nodeReport = datanodeContainerManager.getNodeReport(); // TODO : Add responses to the command Queue. - SCMRegisteredCmdResponseProto response = rpcEndPoint.getEndPoint() + SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint() .register(datanodeDetails.getProtoBufMessage(), nodeReport, contianerReport); Preconditions.checkState(UUID.fromString(response.getDatanodeUUID()) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 6758479077c..b357fefb79d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -19,14 +19,14 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; @@ -219,7 +219,7 @@ public class OzoneContainer { /** * Returns node report of container storage usage. */ - public SCMNodeReport getNodeReport() throws IOException { + public NodeReportProto getNodeReport() throws IOException { return this.manager.getNodeReport(); } @@ -255,7 +255,7 @@ public class OzoneContainer { * @return - container report. * @throws IOException */ - public ContainerReportsRequestProto getContainerReport() throws IOException { + public ContainerReportsProto getContainerReport() throws IOException { return this.manager.getContainerReport(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index e2a3bf5fccc..a950a3144a9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -18,21 +18,21 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos .ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -55,13 +55,12 @@ public interface StorageContainerDatanodeProtocol { /** * Used by data node to send a Heartbeat. - * @param datanodeDetails - Datanode Details. - * @param nodeReport - node report state + * @param heartbeat Heartbeat * @return - SCMHeartbeatResponseProto * @throws IOException */ - SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport) throws IOException; + SCMHeartbeatResponseProto sendHeartbeat(SCMHeartbeatRequestProto heartbeat) + throws IOException; /** * Register Datanode. @@ -70,19 +69,10 @@ public interface StorageContainerDatanodeProtocol { * @param containerReportsRequestProto - Container Reports. * @return SCM Command. */ - SCMRegisteredCmdResponseProto register(DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport, ContainerReportsRequestProto + SCMRegisteredResponseProto register(DatanodeDetailsProto datanodeDetails, + NodeReportProto nodeReport, ContainerReportsProto containerReportsRequestProto) throws IOException; - /** - * Send a container report. - * @param reports -- Container report. - * @return container reports response. - * @throws IOException - */ - ContainerReportsResponseProto sendContainerReport( - ContainerReportsRequestProto reports) throws IOException; - /** * Used by datanode to send block deletion ACK to SCM. * @param request block deletion transactions. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index 14038fb0946..790f58acf25 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -18,11 +18,12 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import java.util.List; @@ -49,11 +50,11 @@ public interface StorageContainerNodeProtocol { /** * Register the node if the node finds that it is not registered with any SCM. * @param datanodeDetails DatanodeDetails - * @param nodeReport SCMNodeReport + * @param nodeReport NodeReportProto * @return SCMHeartbeatResponseProto */ - SCMCommand register(DatanodeDetailsProto datanodeDetails, SCMNodeReport - nodeReport); + RegisteredCommand register(DatanodeDetails datanodeDetails, + NodeReportProto nodeReport); /** * Send heartbeat to indicate the datanode is alive and doing well. @@ -61,7 +62,7 @@ public interface StorageContainerNodeProtocol { * @param nodeReport - node report. * @return SCMheartbeat response list */ - List sendHeartbeat(DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport); + List sendHeartbeat(DatanodeDetails datanodeDetails, + NodeReportProto nodeReport); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java index d1d64881344..4f4f82b6002 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -19,18 +19,16 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand; /** * Asks datanode to close a container. */ public class CloseContainerCommand - extends SCMCommand { + extends SCMCommand { private long containerID; @@ -44,8 +42,8 @@ public class CloseContainerCommand * @return Type */ @Override - public SCMCmdType getType() { - return closeContainerCommand; + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.closeContainerCommand; } /** @@ -58,13 +56,13 @@ public class CloseContainerCommand return getProto().toByteArray(); } - public SCMCloseContainerCmdResponseProto getProto() { - return SCMCloseContainerCmdResponseProto.newBuilder() + public CloseContainerCommandProto getProto() { + return CloseContainerCommandProto.newBuilder() .setContainerID(containerID).build(); } public static CloseContainerCommand getFromProtobuf( - SCMCloseContainerCmdResponseProto closeContainerProto) { + CloseContainerCommandProto closeContainerProto) { Preconditions.checkNotNull(closeContainerProto); return new CloseContainerCommand(closeContainerProto.getContainerID()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java index a11ca25a304..4fa33f68b00 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.ozone.protocol.commands; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.DeleteBlocksCommandProto; import java.util.List; @@ -30,7 +30,7 @@ import java.util.List; * A SCM command asks a datanode to delete a number of blocks. */ public class DeleteBlocksCommand extends - SCMCommand { + SCMCommand { private List blocksTobeDeleted; @@ -44,8 +44,8 @@ public class DeleteBlocksCommand extends } @Override - public SCMCmdType getType() { - return SCMCmdType.deleteBlocksCommand; + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.deleteBlocksCommand; } @Override @@ -54,13 +54,13 @@ public class DeleteBlocksCommand extends } public static DeleteBlocksCommand getFromProtobuf( - SCMDeleteBlocksCmdResponseProto deleteBlocksProto) { + DeleteBlocksCommandProto deleteBlocksProto) { return new DeleteBlocksCommand(deleteBlocksProto .getDeletedBlocksTransactionsList()); } - public SCMDeleteBlocksCmdResponseProto getProto() { - return SCMDeleteBlocksCmdResponseProto.newBuilder() + public DeleteBlocksCommandProto getProto() { + return DeleteBlocksCommandProto.newBuilder() .addAllDeletedBlocksTransactions(blocksTobeDeleted).build(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java index 69f2c186fa9..3a5da72f482 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java @@ -19,18 +19,15 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto .ErrorCode; /** * Response to Datanode Register call. */ -public class RegisteredCommand extends - SCMCommand { +public class RegisteredCommand { private String datanodeUUID; private String clusterID; private ErrorCode error; @@ -59,16 +56,6 @@ public class RegisteredCommand extends return new Builder(); } - /** - * Returns the type of this command. - * - * @return Type - */ - @Override - public SCMCmdType getType() { - return SCMCmdType.registeredCommand; - } - /** * Returns datanode UUID. * @@ -117,10 +104,9 @@ public class RegisteredCommand extends * * @return A protobuf message. */ - @Override public byte[] getProtoBufMessage() { - SCMRegisteredCmdResponseProto.Builder builder = - SCMRegisteredCmdResponseProto.newBuilder() + SCMRegisteredResponseProto.Builder builder = + SCMRegisteredResponseProto.newBuilder() .setClusterID(this.clusterID) .setDatanodeUUID(this.datanodeUUID) .setErrorCode(this.error); @@ -157,7 +143,7 @@ public class RegisteredCommand extends * @param response - RegisteredCmdResponseProto * @return RegisteredCommand */ - public RegisteredCommand getFromProtobuf(SCMRegisteredCmdResponseProto + public RegisteredCommand getFromProtobuf(SCMRegisteredResponseProto response) { Preconditions.checkNotNull(response); if (response.hasHostname() && response.hasIpAddress()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java index c167d59ddc1..953e31a02ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java @@ -18,18 +18,16 @@ package org.apache.hadoop.ozone.protocol.commands; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto; /** * Informs a datanode to register itself with SCM again. */ public class ReregisterCommand extends - SCMCommand{ + SCMCommand{ /** * Returns the type of this command. @@ -37,8 +35,8 @@ public class ReregisterCommand extends * @return Type */ @Override - public SCMCmdType getType() { - return reregisterCommand; + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.reregisterCommand; } /** @@ -51,8 +49,8 @@ public class ReregisterCommand extends return getProto().toByteArray(); } - public SCMReregisterCmdResponseProto getProto() { - return SCMReregisterCmdResponseProto + public ReregisterCommandProto getProto() { + return ReregisterCommandProto .newBuilder() .build(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index 73e4194d8cf..35ca802bee1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.protobuf.GeneratedMessage; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; /** * A class that acts as the base class to convert between Java and SCM @@ -31,7 +31,7 @@ public abstract class SCMCommand { * Returns the type of this command. * @return Type */ - public abstract SCMCmdType getType(); + public abstract SCMCommandProto.Type getType(); /** * Gets the protobuf message of this object. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index a56c57a3326..40fe189600d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -19,25 +19,24 @@ package org.apache.hadoop.ozone.protocolPB; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos .ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; + import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -123,22 +122,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB /** * Send by datanode to SCM. * - * @param datanodeDetailsProto - Datanode Details - * @param nodeReport - node report + * @param heartbeat node heartbeat * @throws IOException */ @Override public SCMHeartbeatResponseProto sendHeartbeat( - DatanodeDetailsProto datanodeDetailsProto, - SCMNodeReport nodeReport) throws IOException { - SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto - .newBuilder(); - req.setDatanodeDetails(datanodeDetailsProto); - req.setNodeReport(nodeReport); + SCMHeartbeatRequestProto heartbeat) throws IOException { final SCMHeartbeatResponseProto resp; try { - resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build()); + resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, heartbeat); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -154,16 +147,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB * @return SCM Command. */ @Override - public SCMRegisteredCmdResponseProto register( - DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport, - ContainerReportsRequestProto containerReportsRequestProto) + public SCMRegisteredResponseProto register( + DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, + ContainerReportsProto containerReportsRequestProto) throws IOException { SCMRegisterRequestProto.Builder req = SCMRegisterRequestProto.newBuilder(); req.setDatanodeDetails(datanodeDetailsProto); req.setContainerReport(containerReportsRequestProto); req.setNodeReport(nodeReport); - final SCMRegisteredCmdResponseProto response; + final SCMRegisteredResponseProto response; try { response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build()); } catch (ServiceException e) { @@ -172,25 +165,6 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB return response; } - /** - * Send a container report. - * - * @param reports -- Container report - * @return HeartbeatRespose.nullcommand. - * @throws IOException - */ - @Override - public ContainerReportsResponseProto sendContainerReport( - ContainerReportsRequestProto reports) throws IOException { - final ContainerReportsResponseProto resp; - try { - resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - return resp; - } - @Override public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( ContainerBlocksDeletionACKProto deletedBlocks) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 07dba572efa..7e8bd8a2acf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -19,18 +19,22 @@ package org.apache.hadoop.ozone.protocolPB; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos .ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -55,9 +59,8 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB } @Override - public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto - getVersion(RpcController controller, - StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request) + public SCMVersionResponseProto getVersion(RpcController controller, + SCMVersionRequestProto request) throws ServiceException { try { return impl.getVersion(request); @@ -67,15 +70,13 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB } @Override - public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto - register(RpcController controller, StorageContainerDatanodeProtocolProtos - .SCMRegisterRequestProto request) throws ServiceException { + public SCMRegisteredResponseProto register(RpcController controller, + SCMRegisterRequestProto request) throws ServiceException { try { - ContainerReportsRequestProto containerRequestProto = null; - SCMNodeReport scmNodeReport = null; - containerRequestProto = request.getContainerReport(); - scmNodeReport = request.getNodeReport(); - return impl.register(request.getDatanodeDetails(), scmNodeReport, + ContainerReportsProto containerRequestProto = request + .getContainerReport(); + NodeReportProto dnNodeReport = request.getNodeReport(); + return impl.register(request.getDatanodeDetails(), dnNodeReport, containerRequestProto); } catch (IOException e) { throw new ServiceException(e); @@ -83,27 +84,15 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB } @Override - public SCMHeartbeatResponseProto - sendHeartbeat(RpcController controller, + public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller, SCMHeartbeatRequestProto request) throws ServiceException { try { - return impl.sendHeartbeat(request.getDatanodeDetails(), - request.getNodeReport()); + return impl.sendHeartbeat(request); } catch (IOException e) { throw new ServiceException(e); } } - @Override - public ContainerReportsResponseProto sendContainerReport( - RpcController controller, ContainerReportsRequestProto request) - throws ServiceException { - try { - return impl.sendContainerReport(request); - } catch (IOException e) { - throw new ServiceException(e); - } - } @Override public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 20e6af8c366..cc131e0357f 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -34,6 +34,42 @@ package hadoop.hdds; import "hdds.proto"; +/** + * Request for version info of the software stack on the server. + */ +message SCMVersionRequestProto {} + +/** +* Generic response that is send to a version request. This allows keys to be +* added on the fly and protocol to remain stable. +*/ +message SCMVersionResponseProto { + required uint32 softwareVersion = 1; + repeated hadoop.hdds.KeyValue keys = 2; +} + +message SCMRegisterRequestProto { + required DatanodeDetailsProto datanodeDetails = 1; + required NodeReportProto nodeReport = 2; + required ContainerReportsProto containerReport = 3; +} + +/** + * Datanode ID returned by the SCM. This is similar to name node + * registeration of a datanode. + */ +message SCMRegisteredResponseProto { + enum ErrorCode { + success = 1; + errorNodeNotPermitted = 2; + } + required ErrorCode errorCode = 1; + required string datanodeUUID = 2; + required string clusterID = 3; + optional SCMNodeAddressList addressList = 4; + optional string hostname = 5; + optional string ipAddress = 6; +} /** * This message is send by data node to indicate that it is alive or it is @@ -41,60 +77,38 @@ import "hdds.proto"; */ message SCMHeartbeatRequestProto { required DatanodeDetailsProto datanodeDetails = 1; - optional SCMNodeReport nodeReport = 2; + optional NodeReportProto nodeReport = 2; + optional ContainerReportsProto containerReport = 3; } -/** -A container report contains the following information. -*/ -message ContainerInfo { - optional string finalhash = 2; - optional int64 size = 3; - optional int64 used = 4; - optional int64 keyCount = 5; - // TODO: move the io count to separate message - optional int64 readCount = 6; - optional int64 writeCount = 7; - optional int64 readBytes = 8; - optional int64 writeBytes = 9; - required int64 containerID = 10; - optional hadoop.hdds.LifeCycleState state = 11; +/* + * A group of commands for the datanode to execute + */ +message SCMHeartbeatResponseProto { + required string datanodeUUID = 1; + repeated SCMCommandProto commands = 2; } -// The deleted blocks which are stored in deletedBlock.db of scm. -// We don't use BlockID because this only contians multiple localIDs -// of the same containerID. -message DeletedBlocksTransaction { - required int64 txID = 1; - required int64 containerID = 2; - repeated int64 localID = 3; - // the retry time of sending deleting command to datanode. - required int32 count = 4; -} - -/** -A set of container reports, max count is generally set to -8192 since that keeps the size of the reports under 1 MB. -*/ -message ContainerReportsRequestProto { - enum reportType { - fullReport = 0; - deltaReport = 1; - } - required DatanodeDetailsProto datanodeDetails = 1; - repeated ContainerInfo reports = 2; - required reportType type = 3; -} - -message ContainerReportsResponseProto { +message SCMNodeAddressList { + repeated string addressList = 1; } /** * This message is send along with the heart beat to report datanode -* storage utilization by SCM. +* storage utilization to SCM. */ -message SCMNodeReport { - repeated SCMStorageReport storageReport = 1; +message NodeReportProto { + repeated StorageReportProto storageReport = 1; +} + +message StorageReportProto { + required string storageUuid = 1; + required string storageLocation = 2; + optional uint64 capacity = 3 [default = 0]; + optional uint64 scmUsed = 4 [default = 0]; + optional uint64 remaining = 5 [default = 0]; + optional StorageTypeProto storageType = 6 [default = DISK]; + optional bool failed = 7 [default = false]; } /** @@ -108,117 +122,71 @@ enum StorageTypeProto { PROVIDED = 5; } -message SCMStorageReport { - required string storageUuid = 1; - required string storageLocation = 2; - optional uint64 capacity = 3 [default = 0]; - optional uint64 scmUsed = 4 [default = 0]; - optional uint64 remaining = 5 [default = 0]; - optional StorageTypeProto storageType = 6 [default = DISK]; - optional bool failed = 7 [default = false]; -} - -message SCMRegisterRequestProto { - required DatanodeDetailsProto datanodeDetails = 1; - required SCMNodeReport nodeReport = 2; - required ContainerReportsRequestProto containerReport = 3; -} - /** - * Request for version info of the software stack on the server. - */ -message SCMVersionRequestProto { - -} - -/** -* Generic response that is send to a version request. This allows keys to be -* added on the fly and protocol to remain stable. +A set of container reports, max count is generally set to +8192 since that keeps the size of the reports under 1 MB. */ -message SCMVersionResponseProto { - required uint32 softwareVersion = 1; - repeated hadoop.hdds.KeyValue keys = 2; +message ContainerReportsProto { + repeated ContainerInfo reports = 2; } -message SCMNodeAddressList { - repeated string addressList = 1; -} /** - * Datanode ID returned by the SCM. This is similar to name node - * registeration of a datanode. +A container report contains the following information. +*/ +message ContainerInfo { + optional string finalhash = 1; + optional int64 size = 2; + optional int64 used = 3; + optional int64 keyCount = 4; + // TODO: move the io count to separate message + optional int64 readCount = 5; + optional int64 writeCount = 6; + optional int64 readBytes = 7; + optional int64 writeBytes = 8; + required int64 containerID = 9; + optional hadoop.hdds.LifeCycleState state = 10; +} + +/* + * These are commands returned by SCM for to the datanode to execute. */ -message SCMRegisteredCmdResponseProto { - enum ErrorCode { - success = 1; - errorNodeNotPermitted = 2; +message SCMCommandProto { + enum Type { + reregisterCommand = 1; + deleteBlocksCommand = 2; + closeContainerCommand = 3; + deleteContainerCommand = 4; } - required ErrorCode errorCode = 2; - required string datanodeUUID = 3; - required string clusterID = 4; - optional SCMNodeAddressList addressList = 5; - optional string hostname = 6; - optional string ipAddress = 7; + // TODO: once we start using protoc 3.x, refactor this message using "oneof" + required Type commandType = 1; + optional ReregisterCommandProto reregisterCommandProto = 2; + optional DeleteBlocksCommandProto deleteBlocksCommandProto = 3; + optional CloseContainerCommandProto closeContainerCommandProto = 4; + optional DeleteContainerCommandProto deleteContainerCommandProto = 5; } /** * SCM informs a datanode to register itself again. * With recieving this command, datanode will transit to REGISTER state. */ -message SCMReregisterCmdResponseProto {} +message ReregisterCommandProto {} -/** -This command tells the data node to send in the container report when possible -*/ -message SendContainerReportProto { -} - -/** -This command asks the datanode to close a specific container. -*/ -message SCMCloseContainerCmdResponseProto { - required int64 containerID = 1; -} - -/** -Type of commands supported by SCM to datanode protocol. -*/ -enum SCMCmdType { - versionCommand = 2; - registeredCommand = 3; - reregisterCommand = 4; - deleteBlocksCommand = 5; - closeContainerCommand = 6; -} - -/* - * These are commands returned by SCM for to the datanode to execute. - */ -message SCMCommandResponseProto { - required SCMCmdType cmdType = 2; // Type of the command - optional SCMRegisteredCmdResponseProto registeredProto = 3; - optional SCMVersionResponseProto versionProto = 4; - optional SCMReregisterCmdResponseProto reregisterProto = 5; - optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 6; - required string datanodeUUID = 7; - optional SCMCloseContainerCmdResponseProto closeContainerProto = 8; -} - - -/* - * A group of commands for the datanode to execute - */ -message SCMHeartbeatResponseProto { - repeated SCMCommandResponseProto commands = 1; -} // HB response from SCM, contains a list of block deletion transactions. -message SCMDeleteBlocksCmdResponseProto { +message DeleteBlocksCommandProto { repeated DeletedBlocksTransaction deletedBlocksTransactions = 1; } -// SendACK response returned by datanode to SCM, currently empty. -message ContainerBlocksDeletionACKResponseProto { +// The deleted blocks which are stored in deletedBlock.db of scm. +// We don't use BlockID because this only contians multiple localIDs +// of the same containerID. +message DeletedBlocksTransaction { + required int64 txID = 1; + required int64 containerID = 2; + repeated int64 localID = 3; + // the retry time of sending deleting command to datanode. + required int32 count = 4; } // ACK message datanode sent to SCM, contains the result of @@ -231,6 +199,24 @@ message ContainerBlocksDeletionACKProto { repeated DeleteBlockTransactionResult results = 1; } +// SendACK response returned by datanode to SCM, currently empty. +message ContainerBlocksDeletionACKResponseProto { +} + +/** +This command asks the datanode to close a specific container. +*/ +message CloseContainerCommandProto { + required int64 containerID = 1; +} + +/** +This command asks the datanode to close a specific container. +*/ +message DeleteContainerCommandProto { + required int64 containerID = 1; +} + /** * Protocol used from a datanode to StorageContainerManager. * @@ -305,7 +291,7 @@ service StorageContainerDatanodeProtocolService { /** * Registers a data node with SCM. */ - rpc register (SCMRegisterRequestProto) returns (SCMRegisteredCmdResponseProto); + rpc register (SCMRegisterRequestProto) returns (SCMRegisteredResponseProto); /** * Send heartbeat from datanode to SCM. HB's under SCM looks more @@ -314,12 +300,6 @@ service StorageContainerDatanodeProtocolService { */ rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto); - /** - send container reports sends the container report to SCM. This will - return a null command as response. - */ - rpc sendContainerReport(ContainerReportsRequestProto) returns (ContainerReportsResponseProto); - /** * Sends the block deletion ACK to SCM. */ diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index c57a36643aa..0ee6321c992 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -16,12 +16,12 @@ */ package org.apache.hadoop.ozone.container.common; -import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto @@ -30,13 +30,13 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; @@ -56,7 +56,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { // Map of datanode to containers private Map> nodeContainers = new HashMap(); - private Map nodeReports = new HashMap<>(); + private Map nodeReports = new HashMap<>(); /** * Returns the number of heartbeats made to this class. * @@ -166,20 +166,17 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { /** * Used by data node to send a Heartbeat. * - * @param datanodeDetailsProto - DatanodeDetailsProto. - * @param nodeReport - node report. + * @param heartbeat - node heartbeat. * @return - SCMHeartbeatResponseProto * @throws IOException */ @Override public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto - sendHeartbeat(DatanodeDetailsProto datanodeDetailsProto, - SCMNodeReport nodeReport) - throws IOException { + sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException { rpcCount.incrementAndGet(); heartbeatCount.incrementAndGet(); sleepIfNeeded(); - List + List cmdResponses = new LinkedList<>(); return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses) .build(); @@ -193,21 +190,19 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { */ @Override public StorageContainerDatanodeProtocolProtos - .SCMRegisteredCmdResponseProto register( - DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport, - StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto + .SCMRegisteredResponseProto register( + DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, + StorageContainerDatanodeProtocolProtos.ContainerReportsProto containerReportsRequestProto) throws IOException { rpcCount.incrementAndGet(); - sendContainerReport(containerReportsRequestProto); updateNodeReport(datanodeDetailsProto, nodeReport); sleepIfNeeded(); - return StorageContainerDatanodeProtocolProtos - .SCMRegisteredCmdResponseProto + return StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto .newBuilder().setClusterID(UUID.randomUUID().toString()) .setDatanodeUUID(datanodeDetailsProto.getUuid()).setErrorCode( StorageContainerDatanodeProtocolProtos - .SCMRegisteredCmdResponseProto.ErrorCode.success).build(); + .SCMRegisteredResponseProto.ErrorCode.success).build(); } /** @@ -216,19 +211,19 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { * @param nodeReport */ public void updateNodeReport(DatanodeDetailsProto datanodeDetailsProto, - SCMNodeReport nodeReport) { + NodeReportProto nodeReport) { DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf( datanodeDetailsProto); - SCMNodeReport.Builder datanodeReport = SCMNodeReport.newBuilder(); + NodeReportProto.Builder nodeReportProto = NodeReportProto.newBuilder(); - List storageReports = + List storageReports = nodeReport.getStorageReportList(); - for(SCMStorageReport report : storageReports) { - datanodeReport.addStorageReport(report); + for(StorageReportProto report : storageReports) { + nodeReportProto.addStorageReport(report); } - nodeReports.put(datanode, datanodeReport.build()); + nodeReports.put(datanode, nodeReportProto.build()); } @@ -254,39 +249,6 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { return 0; } - /** - * Send a container report. - * - * @param reports -- Container report - * @return HeartbeatResponse.nullcommand. - * @throws IOException - */ - @Override - public StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto - sendContainerReport(StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto reports) throws IOException { - Preconditions.checkNotNull(reports); - containerReportsCount.incrementAndGet(); - - DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf( - reports.getDatanodeDetails()); - if (reports.getReportsCount() > 0) { - Map containers = nodeContainers.get(datanode); - if (containers == null) { - containers = new LinkedHashMap(); - nodeContainers.put(datanode, containers); - } - - for (StorageContainerDatanodeProtocolProtos.ContainerInfo report: - reports.getReportsList()) { - containers.put(report.getContainerID(), report); - } - } - - return StorageContainerDatanodeProtocolProtos - .ContainerReportsResponseProto.newBuilder().build(); - } - @Override public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( ContainerBlocksDeletionACKProto request) throws IOException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 2d886215ba5..f5fe46a3a53 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; @@ -33,7 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseException; @@ -368,11 +369,12 @@ public class ContainerMapping implements Mapping { * @param reports Container report */ @Override - public void processContainerReports(ContainerReportsRequestProto reports) + public void processContainerReports(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) throws IOException { List containerInfos = reports.getReportsList(); - containerSupervisor.handleContainerReport(reports); + containerSupervisor.handleContainerReport(datanodeDetails, reports); for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : containerInfos) { byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID()); @@ -402,7 +404,7 @@ public class ContainerMapping implements Mapping { // Container not found in our container db. LOG.error("Error while processing container report from datanode :" + " {}, for container: {}, reason: container doesn't exist in" + - "container database.", reports.getDatanodeDetails(), + "container database.", datanodeDetails, datanodeState.getContainerID()); } } finally { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java index f5601740c57..ee8e344d0df 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java @@ -16,10 +16,11 @@ */ package org.apache.hadoop.hdds.scm.container; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import java.io.Closeable; import java.io.IOException; @@ -98,7 +99,8 @@ public interface Mapping extends Closeable { * * @param reports Container report */ - void processContainerReports(ContainerReportsRequestProto reports) + void processContainerReports(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java index c14303fc8e0..5bd05746bfc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodePoolManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; @@ -295,24 +295,21 @@ public class ContainerSupervisor implements Closeable { * @param containerReport -- Container report for a specific container from * a datanode. */ - public void handleContainerReport( - ContainerReportsRequestProto containerReport) { - DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf( - containerReport.getDatanodeDetails()); + public void handleContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto containerReport) { inProgressPoolListLock.readLock().lock(); try { String poolName = poolManager.getNodePool(datanodeDetails); for (InProgressPool ppool : inProgressPoolList) { if (ppool.getPoolName().equalsIgnoreCase(poolName)) { - ppool.handleContainerReport(containerReport); + ppool.handleContainerReport(datanodeDetails, containerReport); return; } } // TODO: Decide if we can do anything else with this report. LOG.debug("Discarding the container report for pool {}. " + "That pool is not currently in the pool reconciliation process." + - " Container Name: {}", poolName, - containerReport.getDatanodeDetails()); + " Container Name: {}", poolName, datanodeDetails); } catch (SCMException e) { LOG.warn("Skipping processing container report from datanode {}, " + "cause: failed to get the corresponding node pool", diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java index c444e904d7d..4b547311dac 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,21 +178,20 @@ public final class InProgressPool { * * @param containerReport - ContainerReport */ - public void handleContainerReport( - ContainerReportsRequestProto containerReport) { + public void handleContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto containerReport) { if (status == ProgressStatus.InProgress) { - executorService.submit(processContainerReport(containerReport)); + executorService.submit(processContainerReport(datanodeDetails, + containerReport)); } else { LOG.debug("Cannot handle container report when the pool is in {} status.", status); } } - private Runnable processContainerReport( - ContainerReportsRequestProto reports) { + private Runnable processContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) { return () -> { - DatanodeDetails datanodeDetails = - DatanodeDetails.getFromProtoBuf(reports.getDatanodeDetails()); if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(), (k) -> true)) { nodeProcessed.incrementAndGet(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java index 05a9fc3414c..04658bdf537 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm.node; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import static org.apache.hadoop.util.Time.monotonicNow; @@ -31,7 +31,7 @@ import static org.apache.hadoop.util.Time.monotonicNow; public class HeartbeatQueueItem { private DatanodeDetails datanodeDetails; private long recvTimestamp; - private SCMNodeReport nodeReport; + private NodeReportProto nodeReport; /** * @@ -40,7 +40,7 @@ public class HeartbeatQueueItem { * @param nodeReport - node report associated with the heartbeat if any. */ HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp, - SCMNodeReport nodeReport) { + NodeReportProto nodeReport) { this.datanodeDetails = datanodeDetails; this.recvTimestamp = recvTimestamp; this.nodeReport = nodeReport; @@ -56,7 +56,7 @@ public class HeartbeatQueueItem { /** * @return node report. */ - public SCMNodeReport getNodeReport() { + public NodeReportProto getNodeReport() { return nodeReport; } @@ -72,7 +72,7 @@ public class HeartbeatQueueItem { */ public static class Builder { private DatanodeDetails datanodeDetails; - private SCMNodeReport nodeReport; + private NodeReportProto nodeReport; private long recvTimestamp = monotonicNow(); public Builder setDatanodeDetails(DatanodeDetails dnDetails) { @@ -80,8 +80,8 @@ public class HeartbeatQueueItem { return this; } - public Builder setNodeReport(SCMNodeReport scmNodeReport) { - this.nodeReport = scmNodeReport; + public Builder setNodeReport(NodeReportProto report) { + this.nodeReport = report; return this; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 353a069e3b5..b339fb7ce99 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -28,15 +28,14 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto .ErrorCode; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ipc.Server; @@ -592,7 +591,7 @@ public class SCMNodeManager DatanodeDetails datanodeDetails = hbItem.getDatanodeDetails(); UUID datanodeUuid = datanodeDetails.getUuid(); - SCMNodeReport nodeReport = hbItem.getNodeReport(); + NodeReportProto nodeReport = hbItem.getNodeReport(); long recvTimestamp = hbItem.getRecvTimestamp(); long processTimestamp = Time.monotonicNow(); if (LOG.isTraceEnabled()) { @@ -637,7 +636,7 @@ public class SCMNodeManager new ReregisterCommand()); } - private void updateNodeStat(UUID dnId, SCMNodeReport nodeReport) { + private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) { SCMNodeStat stat = nodeStats.get(dnId); if (stat == null) { LOG.debug("SCM updateNodeStat based on heartbeat from previous" + @@ -649,8 +648,9 @@ public class SCMNodeManager long totalCapacity = 0; long totalRemaining = 0; long totalScmUsed = 0; - List storageReports = nodeReport.getStorageReportList(); - for (SCMStorageReport report : storageReports) { + List storageReports = nodeReport + .getStorageReportList(); + for (StorageReportProto report : storageReports) { totalCapacity += report.getCapacity(); totalRemaining += report.getRemaining(); totalScmUsed+= report.getScmUsed(); @@ -710,7 +710,7 @@ public class SCMNodeManager * Register the node if the node finds that it is not registered with any * SCM. * - * @param datanodeDetailsProto - Send datanodeDetails with Node info. + * @param datanodeDetails - Send datanodeDetails with Node info. * This function generates and assigns new datanode ID * for the datanode. This allows SCM to be run independent * of Namenode if required. @@ -719,13 +719,11 @@ public class SCMNodeManager * @return SCMHeartbeatResponseProto */ @Override - public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto, - SCMNodeReport nodeReport) { + public RegisteredCommand register( + DatanodeDetails datanodeDetails, NodeReportProto nodeReport) { String hostname = null; String ip = null; - DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf( - datanodeDetailsProto); InetAddress dnAddress = Server.getRemoteIp(); if (dnAddress != null) { // Mostly called inside an RPC, update ip and peer hostname @@ -734,7 +732,7 @@ public class SCMNodeManager datanodeDetails.setHostName(hostname); datanodeDetails.setIpAddress(ip); } - SCMCommand responseCommand = verifyDatanodeUUID(datanodeDetails); + RegisteredCommand responseCommand = verifyDatanodeUUID(datanodeDetails); if (responseCommand != null) { return responseCommand; } @@ -785,7 +783,8 @@ public class SCMNodeManager * @param datanodeDetails - Datanode Details. * @return SCMCommand */ - private SCMCommand verifyDatanodeUUID(DatanodeDetails datanodeDetails) { + private RegisteredCommand verifyDatanodeUUID( + DatanodeDetails datanodeDetails) { if (datanodeDetails.getUuid() != null && nodes.containsKey(datanodeDetails.getUuid())) { LOG.trace("Datanode is already registered. Datanode: {}", @@ -802,34 +801,23 @@ public class SCMNodeManager /** * Send heartbeat to indicate the datanode is alive and doing well. * - * @param datanodeDetailsProto - DatanodeDetailsProto. + * @param datanodeDetails - DatanodeDetailsProto. * @param nodeReport - node report. * @return SCMheartbeat response. * @throws IOException */ @Override public List sendHeartbeat( - DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport) { + DatanodeDetails datanodeDetails, NodeReportProto nodeReport) { - Preconditions.checkNotNull(datanodeDetailsProto, "Heartbeat is missing " + + Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " + "DatanodeDetails."); - DatanodeDetails datanodeDetails = DatanodeDetails - .getFromProtoBuf(datanodeDetailsProto); - // Checking for NULL to make sure that we don't get - // an exception from ConcurrentList. - // This could be a problem in tests, if this function is invoked via - // protobuf, transport layer will guarantee that this is not null. - if (datanodeDetails != null) { - heartbeatQueue.add( - new HeartbeatQueueItem.Builder() - .setDatanodeDetails(datanodeDetails) - .setNodeReport(nodeReport) - .build()); - return commandQueue.getCommand(datanodeDetails.getUuid()); - } else { - LOG.error("Datanode ID in heartbeat is null"); - } - return null; + heartbeatQueue.add( + new HeartbeatQueueItem.Builder() + .setDatanodeDetails(datanodeDetails) + .setNodeReport(nodeReport) + .build()); + return commandQueue.getCommand(datanodeDetails.getUuid()); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java index fa423bb6e76..6ea83dfff58 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java @@ -23,7 +23,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMStorageReport; + StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -33,7 +33,11 @@ import org.slf4j.LoggerFactory; import javax.management.ObjectName; import java.io.IOException; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -159,7 +163,7 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { } public StorageReportResult processNodeReport(UUID datanodeID, - StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport) + StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport) throws IOException { Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(nodeReport); @@ -170,9 +174,9 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { Set storagReportSet = new HashSet<>(); Set fullVolumeSet = new HashSet<>(); Set failedVolumeSet = new HashSet<>(); - List + List storageReports = nodeReport.getStorageReportList(); - for (SCMStorageReport report : storageReports) { + for (StorageReportProto report : storageReports) { StorageLocationReport storageReport = StorageLocationReport.getFromProtobuf(report); storagReportSet.add(storageReport); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 6e5b7debe55..1b1645db93c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -25,29 +25,47 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .ContainerBlocksDeletionACKResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.versionCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.registeredCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.deleteBlocksCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.closeContainerCommand; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.deleteBlocksCommand; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.reregisterCommand; + import org.apache.hadoop.hdds.scm.HddsServerUtil; @@ -150,96 +168,81 @@ public class SCMDatanodeProtocolServer implements @Override public SCMHeartbeatResponseProto sendHeartbeat( - HddsProtos.DatanodeDetailsProto datanodeDetails, - StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport) + SCMHeartbeatRequestProto heartbeat) throws IOException { + // TODO: Add a heartbeat dispatcher. + DatanodeDetails datanodeDetails = DatanodeDetails + .getFromProtoBuf(heartbeat.getDatanodeDetails()); + NodeReportProto nodeReport = heartbeat.getNodeReport(); List commands = scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport); - List cmdResponses = new LinkedList<>(); + List cmdResponses = new LinkedList<>(); for (SCMCommand cmd : commands) { - cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid())); + cmdResponses.add(getCommandResponse(cmd)); } return SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID(datanodeDetails.getUuidString()) .addAllCommands(cmdResponses).build(); } @Override - public SCMRegisteredCmdResponseProto register( - HddsProtos.DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport, - ContainerReportsRequestProto containerReportsRequestProto) + public SCMRegisteredResponseProto register( + HddsProtos.DatanodeDetailsProto datanodeDetailsProto, + NodeReportProto nodeReport, + ContainerReportsProto containerReportsProto) throws IOException { + DatanodeDetails datanodeDetails = DatanodeDetails + .getFromProtoBuf(datanodeDetailsProto); // TODO : Return the list of Nodes that forms the SCM HA. - RegisteredCommand registeredCommand = (RegisteredCommand) scm - .getScmNodeManager().register(datanodeDetails, nodeReport); - SCMCmdType type = registeredCommand.getType(); - if (type == SCMCmdType.registeredCommand && registeredCommand.getError() - == SCMRegisteredCmdResponseProto.ErrorCode.success) { - scm.getScmContainerManager().processContainerReports( - containerReportsRequestProto); + RegisteredCommand registeredCommand = scm.getScmNodeManager() + .register(datanodeDetails, nodeReport); + if (registeredCommand.getError() + == SCMRegisteredResponseProto.ErrorCode.success) { + scm.getScmContainerManager().processContainerReports(datanodeDetails, + containerReportsProto); } return getRegisteredResponse(registeredCommand); } @VisibleForTesting - public static SCMRegisteredCmdResponseProto getRegisteredResponse( - SCMCommand cmd) { - Preconditions.checkState(cmd.getClass() == RegisteredCommand.class); - RegisteredCommand rCmd = (RegisteredCommand) cmd; - SCMCmdType type = cmd.getType(); - if (type != SCMCmdType.registeredCommand) { - throw new IllegalArgumentException( - "Registered command is not well " + "formed. Internal Error."); - } - return SCMRegisteredCmdResponseProto.newBuilder() + public static SCMRegisteredResponseProto getRegisteredResponse( + RegisteredCommand cmd) { + return SCMRegisteredResponseProto.newBuilder() // TODO : Fix this later when we have multiple SCM support. // .setAddressList(addressList) - .setErrorCode(rCmd.getError()) - .setClusterID(rCmd.getClusterID()) - .setDatanodeUUID(rCmd.getDatanodeUUID()) + .setErrorCode(cmd.getError()) + .setClusterID(cmd.getClusterID()) + .setDatanodeUUID(cmd.getDatanodeUUID()) .build(); } - @Override - public ContainerReportsResponseProto sendContainerReport( - ContainerReportsRequestProto reports) + public void processContainerReports(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) throws IOException { - updateContainerReportMetrics(reports); - + updateContainerReportMetrics(datanodeDetails, reports); // should we process container reports async? - scm.getScmContainerManager().processContainerReports(reports); - return ContainerReportsResponseProto.newBuilder().build(); + scm.getScmContainerManager() + .processContainerReports(datanodeDetails, reports); } - private void updateContainerReportMetrics( - ContainerReportsRequestProto reports) { - ContainerStat newStat = null; - // TODO: We should update the logic once incremental container report - // type is supported. - if (reports - .getType() == StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.reportType.fullReport) { - newStat = new ContainerStat(); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports - .getReportsList()) { - newStat.add(new ContainerStat(info.getSize(), info.getUsed(), - info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), - info.getReadCount(), info.getWriteCount())); - } - - // update container metrics - StorageContainerManager.getMetrics().setLastContainerStat(newStat); + private void updateContainerReportMetrics(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) { + ContainerStat newStat = new ContainerStat(); + for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports + .getReportsList()) { + newStat.add(new ContainerStat(info.getSize(), info.getUsed(), + info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), + info.getReadCount(), info.getWriteCount())); } + // update container metrics + StorageContainerManager.getMetrics().setLastContainerStat(newStat); // Update container stat entry, this will trigger a removal operation if it // exists in cache. - synchronized (scm.getContainerReportCache()) { - String datanodeUuid = reports.getDatanodeDetails().getUuid(); - if (datanodeUuid != null && newStat != null) { - scm.getContainerReportCache().put(datanodeUuid, newStat); - // update global view container metrics - StorageContainerManager.getMetrics().incrContainerStat(newStat); - } - } + String datanodeUuid = datanodeDetails.getUuidString(); + scm.getContainerReportCache().put(datanodeUuid, newStat); + // update global view container metrics + StorageContainerManager.getMetrics().incrContainerStat(newStat); } @@ -298,28 +301,15 @@ public class SCMDatanodeProtocolServer implements * @throws IOException */ @VisibleForTesting - public StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto - getCommandResponse( - SCMCommand cmd, final String datanodeID) throws IOException { - SCMCmdType type = cmd.getType(); - SCMCommandResponseProto.Builder builder = - SCMCommandResponseProto.newBuilder().setDatanodeUUID(datanodeID); - switch (type) { - case registeredCommand: - return builder - .setCmdType(registeredCommand) - .setRegisteredProto(SCMRegisteredCmdResponseProto - .getDefaultInstance()) - .build(); - case versionCommand: - return builder - .setCmdType(versionCommand) - .setVersionProto(SCMVersionResponseProto.getDefaultInstance()) - .build(); + public SCMCommandProto getCommandResponse(SCMCommand cmd) + throws IOException { + SCMCommandProto.Builder builder = + SCMCommandProto.newBuilder(); + switch (cmd.getType()) { case reregisterCommand: return builder - .setCmdType(reregisterCommand) - .setReregisterProto(SCMReregisterCmdResponseProto + .setCommandType(reregisterCommand) + .setReregisterCommandProto(ReregisterCommandProto .getDefaultInstance()) .build(); case deleteBlocksCommand: @@ -335,13 +325,14 @@ public class SCMDatanodeProtocolServer implements .collect(Collectors.toList()); scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs); return builder - .setCmdType(deleteBlocksCommand) - .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto()) + .setCommandType(deleteBlocksCommand) + .setDeleteBlocksCommandProto(((DeleteBlocksCommand) cmd).getProto()) .build(); case closeContainerCommand: return builder - .setCmdType(closeContainerCommand) - .setCloseContainerProto(((CloseContainerCommand) cmd).getProto()) + .setCommandType(closeContainerCommand) + .setCloseContainerCommandProto( + ((CloseContainerCommand) cmd).getProto()) .build(); default: throw new IllegalArgumentException("Not implemented"); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 5cf0a92c4a6..b8036d7dee6 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol - .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageTypeProto; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; @@ -53,16 +53,17 @@ public final class TestUtils { public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager, String uuid) { DatanodeDetails datanodeDetails = getDatanodeDetails(uuid); - nodeManager.register(datanodeDetails.getProtoBufMessage(), null); + nodeManager.register(datanodeDetails, null); return datanodeDetails; } /** * Create Node Report object. - * @return SCMNodeReport + * @return NodeReportProto */ - public static SCMNodeReport createNodeReport(List reports) { - SCMNodeReport.Builder nodeReport = SCMNodeReport.newBuilder(); + public static NodeReportProto createNodeReport( + List reports) { + NodeReportProto.Builder nodeReport = NodeReportProto.newBuilder(); nodeReport.addAllStorageReport(reports); return nodeReport.build(); } @@ -71,14 +72,14 @@ public final class TestUtils { * Create SCM Storage Report object. * @return list of SCMStorageReport */ - public static List createStorageReport(long capacity, + public static List createStorageReport(long capacity, long used, long remaining, String path, StorageTypeProto type, String id, int count) { - List reportList = new ArrayList<>(); + List reportList = new ArrayList<>(); for (int i = 0; i < count; i++) { Preconditions.checkNotNull(path); Preconditions.checkNotNull(id); - SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + StorageReportProto.Builder srb = StorageReportProto.newBuilder(); srb.setStorageUuid(id).setStorageLocation(path).setCapacity(capacity) .setScmUsed(used).setRemaining(remaining); StorageTypeProto storageTypeProto = diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index a46d7ba41e3..8c59462b407 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -24,13 +24,14 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.assertj.core.util.Preconditions; import org.mockito.Mockito; @@ -370,13 +371,13 @@ public class MockNodeManager implements NodeManager { * Register the node if the node finds that it is not registered with any * SCM. * - * @param datanodeDetails DatanodeDetailsProto - * @param nodeReport SCMNodeReport + * @param datanodeDetails DatanodeDetails + * @param nodeReport NodeReportProto * @return SCMHeartbeatResponseProto */ @Override - public SCMCommand register(HddsProtos.DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport) { + public RegisteredCommand register(DatanodeDetails datanodeDetails, + NodeReportProto nodeReport) { return null; } @@ -388,9 +389,8 @@ public class MockNodeManager implements NodeManager { * @return SCMheartbeat response list */ @Override - public List sendHeartbeat( - HddsProtos.DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport) { + public List sendHeartbeat(DatanodeDetails datanodeDetails, + NodeReportProto nodeReport) { if ((datanodeDetails != null) && (nodeReport != null) && (nodeReport .getStorageReportCount() > 0)) { SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); @@ -398,8 +398,9 @@ public class MockNodeManager implements NodeManager { long totalCapacity = 0L; long totalRemaining = 0L; long totalScmUsed = 0L; - List storageReports = nodeReport.getStorageReportList(); - for (SCMStorageReport report : storageReports) { + List storageReports = nodeReport + .getStorageReportList(); + for (StorageReportProto report : storageReports) { totalCapacity += report.getCapacity(); totalRemaining += report.getRemaining(); totalScmUsed += report.getScmUsed(); @@ -407,8 +408,7 @@ public class MockNodeManager implements NodeManager { aggregateStat.subtract(stat); stat.set(totalCapacity, totalScmUsed, totalRemaining); aggregateStat.add(stat); - nodeMetricMap.put(DatanodeDetails - .getFromProtoBuf(datanodeDetails).getUuid(), stat); + nodeMetricMap.put(datanodeDetails.getUuid(), stat); } return null; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java index f318316a088..ba2ab64a766 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -191,8 +191,6 @@ public class TestContainerMapping { public void testFullContainerReport() throws IOException { ContainerInfo info = createContainer(); DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); - ContainerReportsRequestProto.reportType reportType = - ContainerReportsRequestProto.reportType.fullReport; List reports = new ArrayList<>(); StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = @@ -209,12 +207,11 @@ public class TestContainerMapping { reports.add(ciBuilder.build()); - ContainerReportsRequestProto.Builder crBuilder = - ContainerReportsRequestProto.newBuilder(); - crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setType(reportType).addAllReports(reports); + ContainerReportsProto.Builder crBuilder = ContainerReportsProto + .newBuilder(); + crBuilder.addAllReports(reports); - mapping.processContainerReports(crBuilder.build()); + mapping.processContainerReports(datanodeDetails, crBuilder.build()); ContainerInfo updatedContainer = mapping.getContainer(info.getContainerID()); @@ -227,8 +224,6 @@ public class TestContainerMapping { public void testContainerCloseWithContainerReport() throws IOException { ContainerInfo info = createContainer(); DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); - ContainerReportsRequestProto.reportType reportType = - ContainerReportsRequestProto.reportType.fullReport; List reports = new ArrayList<>(); @@ -246,12 +241,11 @@ public class TestContainerMapping { reports.add(ciBuilder.build()); - ContainerReportsRequestProto.Builder crBuilder = - ContainerReportsRequestProto.newBuilder(); - crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setType(reportType).addAllReports(reports); + ContainerReportsProto.Builder crBuilder = + ContainerReportsProto.newBuilder(); + crBuilder.addAllReports(reports); - mapping.processContainerReports(crBuilder.build()); + mapping.processContainerReports(datanodeDetails, crBuilder.build()); ContainerInfo updatedContainer = mapping.getContainer(info.getContainerID()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java index 15ecbada2e3..0a3efda8302 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -199,9 +199,8 @@ public class TestContainerCloser { private void sendContainerReport(ContainerInfo info, long used) throws IOException { - ContainerReportsRequestProto.Builder - reports = ContainerReportsRequestProto.newBuilder(); - reports.setType(ContainerReportsRequestProto.reportType.fullReport); + ContainerReportsProto.Builder + reports = ContainerReportsProto.newBuilder(); StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); @@ -214,9 +213,8 @@ public class TestContainerCloser { .setWriteCount(100000000L) .setReadBytes(2000000000L) .setWriteBytes(2000000000L); - reports.setDatanodeDetails( - TestUtils.getDatanodeDetails().getProtoBufMessage()); reports.addReports(ciBuilder); - mapping.processContainerReports(reports.build()); + mapping.processContainerReports(TestUtils.getDatanodeDetails(), + reports.build()); } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 09b6cd1020a..5ad28f62656 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.test.GenericTestUtils; @@ -133,9 +133,9 @@ public class TestContainerPlacement { for (DatanodeDetails datanodeDetails : datanodes) { String id = UUID.randomUUID().toString(); String path = testDir.getAbsolutePath() + "/" + id; - List reports = TestUtils + List reports = TestUtils .createStorageReport(capacity, used, remaining, path, null, id, 1); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, TestUtils.createNodeReport(reports)); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java index de87e50e1f4..2b04d6b8625 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node; import com.google.common.base.Supplier; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; @@ -26,7 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.test.GenericTestUtils; @@ -63,8 +65,6 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState .HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertEquals; @@ -144,7 +144,7 @@ public class TestNodeManager { for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) { DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( nodeManager); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, null); } @@ -191,8 +191,8 @@ public class TestNodeManager { // Need 100 nodes to come out of chill mode, only one node is sending HB. nodeManager.setMinimumChillModeNodes(100); - nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager) - .getProtoBufMessage(), null); + nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager), + null); GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), 100, 4 * 1000); assertFalse("Not enough heartbeat, Node manager should have" + @@ -219,7 +219,7 @@ public class TestNodeManager { // Send 10 heartbeat from same node, and assert we never leave chill mode. for (int x = 0; x < 10; x++) { - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, null); } @@ -250,7 +250,7 @@ public class TestNodeManager { nodeManager.close(); // These should never be processed. - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, null); // Let us just wait for 2 seconds to prove that HBs are not processed. @@ -274,13 +274,13 @@ public class TestNodeManager { DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); String dnId = datanodeDetails.getUuidString(); String storagePath = testDir.getAbsolutePath() + "/" + dnId; - List reports = + List reports = TestUtils.createStorageReport(100, 10, 90, storagePath, null, dnId, 1); try (SCMNodeManager nodemanager = createNodeManager(conf)) { - nodemanager.register(datanodeDetails.getProtoBufMessage(), + nodemanager.register(datanodeDetails, TestUtils.createNodeReport(reports)); List command = nodemanager.sendHeartbeat( - datanodeDetails.getProtoBufMessage(), null); + datanodeDetails, null); Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails)); Assert.assertTrue("On regular HB calls, SCM responses a " + "datanode with an empty command list", command.isEmpty()); @@ -298,10 +298,10 @@ public class TestNodeManager { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { List command = - nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodemanager.sendHeartbeat(datanodeDetails, null); return command.size() == 1 && command.get(0).getType() - .equals(SCMCmdType.reregisterCommand); + .equals(SCMCommandProto.Type.reregisterCommand); } }, 100, 3 * 1000); } catch (TimeoutException e) { @@ -330,7 +330,7 @@ public class TestNodeManager { for (int x = 0; x < count; x++) { DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( nodeManager); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, null); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), @@ -422,19 +422,19 @@ public class TestNodeManager { DatanodeDetails staleNode = TestUtils.getDatanodeDetails(nodeManager); // Heartbeat once - nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(), + nodeManager.sendHeartbeat(staleNode, null); // Heartbeat all other nodes. for (DatanodeDetails dn : nodeList) { - nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null); + nodeManager.sendHeartbeat(dn, null); } // Wait for 2 seconds .. and heartbeat good nodes again. Thread.sleep(2 * 1000); for (DatanodeDetails dn : nodeList) { - nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null); + nodeManager.sendHeartbeat(dn, null); } // Wait for 2 seconds, wait a total of 4 seconds to make sure that the @@ -451,7 +451,7 @@ public class TestNodeManager { // heartbeat good nodes again. for (DatanodeDetails dn : nodeList) { - nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null); + nodeManager.sendHeartbeat(dn, null); } // 6 seconds is the dead window for this test , so we wait a total of @@ -565,11 +565,11 @@ public class TestNodeManager { DatanodeDetails deadNode = TestUtils.getDatanodeDetails(nodeManager); nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); nodeManager.sendHeartbeat( - staleNode.getProtoBufMessage(), null); + staleNode, null); nodeManager.sendHeartbeat( - deadNode.getProtoBufMessage(), null); + deadNode, null); // Sleep so that heartbeat processing thread gets to run. Thread.sleep(500); @@ -596,15 +596,15 @@ public class TestNodeManager { */ nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); nodeManager.sendHeartbeat( - staleNode.getProtoBufMessage(), null); + staleNode, null); nodeManager.sendHeartbeat( - deadNode.getProtoBufMessage(), null); + deadNode, null); Thread.sleep(1500); nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); Thread.sleep(2 * 1000); assertEquals(1, nodeManager.getNodeCount(HEALTHY)); @@ -625,12 +625,12 @@ public class TestNodeManager { */ nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); nodeManager.sendHeartbeat( - staleNode.getProtoBufMessage(), null); + staleNode, null); Thread.sleep(1500); nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); Thread.sleep(2 * 1000); // 3.5 seconds have elapsed for stale node, so it moves into Stale. @@ -664,11 +664,11 @@ public class TestNodeManager { * back all the nodes in healthy state. */ nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); nodeManager.sendHeartbeat( - staleNode.getProtoBufMessage(), null); + staleNode, null); nodeManager.sendHeartbeat( - deadNode.getProtoBufMessage(), null); + deadNode, null); Thread.sleep(500); //Assert all nodes are healthy. assertEquals(3, nodeManager.getAllNodes().size()); @@ -689,7 +689,7 @@ public class TestNodeManager { int sleepDuration) throws InterruptedException { while (!Thread.currentThread().isInterrupted()) { for (DatanodeDetails dn : list) { - manager.sendHeartbeat(dn.getProtoBufMessage(), null); + manager.sendHeartbeat(dn, null); } Thread.sleep(sleepDuration); } @@ -775,7 +775,7 @@ public class TestNodeManager { // No Thread just one time HBs the node manager, so that these will be // marked as dead nodes eventually. for (DatanodeDetails dn : deadNodeList) { - nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null); + nodeManager.sendHeartbeat(dn, null); } @@ -940,7 +940,7 @@ public class TestNodeManager { DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( nodeManager); nodeManager.sendHeartbeat( - datanodeDetails.getProtoBufMessage(), null); + datanodeDetails, null); String status = nodeManager.getChillModeStatus(); Assert.assertThat(status, containsString("Still in chill " + "mode, waiting on nodes to report in.")); @@ -967,8 +967,7 @@ public class TestNodeManager { // Assert that node manager force enter cannot be overridden by nodes HBs. for (int x = 0; x < 20; x++) { DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager); - nodeManager.sendHeartbeat(datanode.getProtoBufMessage(), - null); + nodeManager.sendHeartbeat(datanode, null); } Thread.sleep(500); @@ -1009,10 +1008,10 @@ public class TestNodeManager { String dnId = datanodeDetails.getUuidString(); long free = capacity - used; String storagePath = testDir.getAbsolutePath() + "/" + dnId; - List reports = TestUtils + List reports = TestUtils .createStorageReport(capacity, used, free, storagePath, null, dnId, 1); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, TestUtils.createNodeReport(reports)); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), @@ -1058,11 +1057,11 @@ public class TestNodeManager { long scmUsed = x * usedPerHeartbeat; long remaining = capacity - scmUsed; String storagePath = testDir.getAbsolutePath() + "/" + dnId; - List reports = TestUtils + List reports = TestUtils .createStorageReport(capacity, scmUsed, remaining, storagePath, null, dnId, 1); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, TestUtils.createNodeReport(reports)); Thread.sleep(100); } @@ -1140,10 +1139,10 @@ public class TestNodeManager { // Send a new report to bring the dead node back to healthy String storagePath = testDir.getAbsolutePath() + "/" + dnId; - List reports = TestUtils + List reports = TestUtils .createStorageReport(capacity, expectedScmUsed, expectedRemaining, storagePath, null, dnId, 1); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, TestUtils.createNodeReport(reports)); // Wait up to 5 seconds so that the dead node becomes healthy diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java index b8244127b22..072dee7c7fe 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java @@ -21,9 +21,9 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMNodeReport; + StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMStorageReport; + StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.ozone.OzoneConsts; @@ -134,7 +134,7 @@ public class TestSCMNodeStorageStatMap { @Test public void testProcessNodeReportCheckOneNode() throws IOException { UUID key = getFirstKey(); - List reportList = new ArrayList<>(); + List reportList = new ArrayList<>(); Set reportSet = testData.get(key); SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); map.insertNewDatanode(key, reportSet); @@ -146,16 +146,16 @@ public class TestSCMNodeStorageStatMap { long reportCapacity = report.getCapacity(); long reportScmUsed = report.getScmUsed(); long reportRemaining = report.getRemaining(); - List reports = TestUtils + List reports = TestUtils .createStorageReport(reportCapacity, reportScmUsed, reportRemaining, path, null, storageId, 1); StorageReportResult result = map.processNodeReport(key, TestUtils.createNodeReport(reports)); Assert.assertEquals(result.getStatus(), SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL); - StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb = - SCMNodeReport.newBuilder(); - SCMStorageReport srb = reportSet.iterator().next().getProtoBufMessage(); + StorageContainerDatanodeProtocolProtos.NodeReportProto.Builder nrb = + NodeReportProto.newBuilder(); + StorageReportProto srb = reportSet.iterator().next().getProtoBufMessage(); reportList.add(srb); result = map.processNodeReport(key, TestUtils.createNodeReport(reportList)); Assert.assertEquals(result.getStatus(), @@ -168,7 +168,7 @@ public class TestSCMNodeStorageStatMap { Assert.assertEquals(result.getStatus(), SCMNodeStorageStatMap.ReportStatus.STORAGE_OUT_OF_SPACE); // Mark a disk failed - SCMStorageReport srb2 = SCMStorageReport.newBuilder() + StorageReportProto srb2 = StorageReportProto.newBuilder() .setStorageUuid(UUID.randomUUID().toString()) .setStorageLocation(srb.getStorageLocation()).setScmUsed(reportCapacity) .setCapacity(reportCapacity).setRemaining(0).setFailed(true).build(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 1d92cdc2979..34779daf947 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -20,22 +20,21 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.ipc.RPC; @@ -200,7 +199,7 @@ public class TestEndPoint { DatanodeDetails nodeToRegister = getDatanodeDetails(); try (EndpointStateMachine rpcEndPoint = createEndpoint( SCMTestUtils.getConf(), serverAddress, 1000)) { - SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint() + SCMRegisteredResponseProto responseProto = rpcEndPoint.getEndPoint() .register(nodeToRegister.getProtoBufMessage(), TestUtils .createNodeReport( getStorageReports(nodeToRegister.getUuidString())), @@ -215,7 +214,7 @@ public class TestEndPoint { } } - private List getStorageReports(String id) { + private List getStorageReports(String id) { String storagePath = testDir.getAbsolutePath() + "/" + id; return TestUtils.createStorageReport(100, 10, 90, storagePath, null, id, 1); } @@ -293,9 +292,14 @@ public class TestEndPoint { createEndpoint(SCMTestUtils.getConf(), serverAddress, 1000)) { String storageId = UUID.randomUUID().toString(); + SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(dataNode.getProtoBufMessage()) + .setNodeReport(TestUtils.createNodeReport( + getStorageReports(storageId))) + .build(); + SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() - .sendHeartbeat(dataNode.getProtoBufMessage(), - TestUtils.createNodeReport(getStorageReports(storageId))); + .sendHeartbeat(request); Assert.assertNotNull(responseProto); Assert.assertEquals(0, responseProto.getCommandsCount()); } @@ -361,86 +365,11 @@ public class TestEndPoint { lessThanOrEqualTo(rpcTimeout + tolerance)); } - /** - * Returns a new container report. - * @return - */ - ContainerReport getRandomContainerReport() { - return new ContainerReport(RandomUtils.nextLong(), - DigestUtils.sha256Hex("Random")); - } - - /** - * Creates dummy container reports. - * @param count - The number of closed containers to create. - * @return ContainerReportsProto - */ - StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto - createDummyContainerReports(int count) { - StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder - reportsBuilder = StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.newBuilder(); - for (int x = 0; x < count; x++) { - reportsBuilder.addReports(getRandomContainerReport() - .getProtoBufMessage()); - } - reportsBuilder.setDatanodeDetails(getDatanodeDetails() - .getProtoBufMessage()); - reportsBuilder.setType(StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.reportType.fullReport); - return reportsBuilder.build(); - } - - /** - * Tests that rpcEndpoint sendContainerReport works as expected. - * @throws Exception - */ - @Test - public void testContainerReportSend() throws Exception { - final int count = 1000; - scmServerImpl.reset(); - try (EndpointStateMachine rpcEndPoint = - createEndpoint(SCMTestUtils.getConf(), - serverAddress, 1000)) { - ContainerReportsResponseProto responseProto = rpcEndPoint - .getEndPoint().sendContainerReport(createDummyContainerReports( - count)); - Assert.assertNotNull(responseProto); - } - Assert.assertEquals(1, scmServerImpl.getContainerReportsCount()); - Assert.assertEquals(count, scmServerImpl.getContainerCount()); - } - - - /** - * Tests that rpcEndpoint sendContainerReport works as expected. - * @throws Exception - */ - @Test - public void testContainerReport() throws Exception { - final int count = 1000; - scmServerImpl.reset(); - try (EndpointStateMachine rpcEndPoint = - createEndpoint(SCMTestUtils.getConf(), - serverAddress, 1000)) { - ContainerReportsResponseProto responseProto = rpcEndPoint - .getEndPoint().sendContainerReport(createContainerReport(count, - null)); - Assert.assertNotNull(responseProto); - } - Assert.assertEquals(1, scmServerImpl.getContainerReportsCount()); - Assert.assertEquals(count, scmServerImpl.getContainerCount()); - final long expectedKeyCount = count * 1000; - Assert.assertEquals(expectedKeyCount, scmServerImpl.getKeyCount()); - final long expectedBytesUsed = count * OzoneConsts.GB * 2; - Assert.assertEquals(expectedBytesUsed, scmServerImpl.getBytesUsed()); - } - - private ContainerReportsRequestProto createContainerReport( + private ContainerReportsProto createContainerReport( int count, DatanodeDetails datanodeDetails) { - StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder + StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder reportsBuilder = StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.newBuilder(); + .ContainerReportsProto.newBuilder(); for (int x = 0; x < count; x++) { long containerID = RandomUtils.nextLong(); ContainerReport report = new ContainerReport(containerID, @@ -455,14 +384,6 @@ public class TestEndPoint { reportsBuilder.addReports(report.getProtoBufMessage()); } - if(datanodeDetails == null) { - reportsBuilder.setDatanodeDetails(getDatanodeDetails() - .getProtoBufMessage()); - } else { - reportsBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()); - } - reportsBuilder.setType(StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.reportType.fullReport); return reportsBuilder.build(); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java deleted file mode 100644 index e197886e008..00000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java +++ /dev/null @@ -1,275 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor; -import org.apache.hadoop.hdds.scm.container.replication.InProgressPool; -import org.apache.hadoop.hdds.scm.node.CommandQueue; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.NodePoolManager; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.container.common.SCMTestUtils; -import org.apache.hadoop.ozone.container.testutils - .ReplicationDatanodeStateManager; -import org.apache.hadoop.ozone.container.testutils.ReplicationNodeManagerMock; -import org.apache.hadoop.ozone.container.testutils - .ReplicationNodePoolManagerMock; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.GenericTestUtils.LogCapturer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.event.Level; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .HEALTHY; -import static org.apache.ratis.shaded.com.google.common.util.concurrent - .Uninterruptibles.sleepUninterruptibly; - -/** - * Tests for the container manager. - */ -public class TestContainerSupervisor { - final static String POOL_NAME_TEMPLATE = "Pool%d"; - static final int MAX_DATANODES = 72; - static final int POOL_SIZE = 24; - static final int POOL_COUNT = 3; - private LogCapturer logCapturer = LogCapturer.captureLogs( - LogFactory.getLog(ContainerSupervisor.class)); - private List datanodes = new LinkedList<>(); - private NodeManager nodeManager; - private NodePoolManager poolManager; - private CommandQueue commandQueue; - private ContainerSupervisor containerSupervisor; - private ReplicationDatanodeStateManager datanodeStateManager; - - @After - public void tearDown() throws Exception { - logCapturer.stopCapturing(); - GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.INFO); - } - - @Before - public void setUp() throws Exception { - GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG); - Map nodeStateMap = new HashMap<>(); - // We are setting up 3 pools with 24 nodes each in this cluster. - // First we create 72 Datanodes. - for (int x = 0; x < MAX_DATANODES; x++) { - DatanodeDetails datanode = TestUtils.getDatanodeDetails(); - datanodes.add(datanode); - nodeStateMap.put(datanode, HEALTHY); - } - - commandQueue = new CommandQueue(); - - // All nodes in this cluster are healthy for time being. - nodeManager = new ReplicationNodeManagerMock(nodeStateMap, commandQueue); - poolManager = new ReplicationNodePoolManagerMock(); - - - Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " + - "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES); - - // Start from 1 instead of zero so we can multiply and get the node index. - for (int y = 1; y <= POOL_COUNT; y++) { - String poolName = String.format(POOL_NAME_TEMPLATE, y); - for (int z = 0; z < POOL_SIZE; z++) { - DatanodeDetails id = datanodes.get(y * z); - poolManager.addNode(poolName, id); - } - } - OzoneConfiguration config = SCMTestUtils.getOzoneConf(); - config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 2, - TimeUnit.SECONDS); - config.setTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, 1, - TimeUnit.SECONDS); - containerSupervisor = new ContainerSupervisor(config, - nodeManager, poolManager); - datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager, - poolManager); - // Sleep for one second to make sure all threads get time to run. - sleepUninterruptibly(1, TimeUnit.SECONDS); - } - - @Test - /** - * Asserts that at least one pool is picked up for processing. - */ - public void testAssertPoolsAreProcessed() { - // This asserts that replication manager has started processing at least - // one pool. - Assert.assertTrue(containerSupervisor.getInProgressPoolCount() > 0); - - // Since all datanodes are flagged as healthy in this test, for each - // datanode we must have queued a command. - Assert.assertEquals("Commands are in queue :", - POOL_SIZE * containerSupervisor.getInProgressPoolCount(), - commandQueue.getCommandsInQueue()); - } - - @Test - /** - * This test sends container reports for 2 containers to a pool in progress. - * Asserts that we are able to find a container with single replica and do - * not find container with 3 replicas. - */ - public void testDetectSingleContainerReplica() throws TimeoutException, - InterruptedException { - long singleNodeContainerID = 9001; - long threeNodeContainerID = 9003; - InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0); - // Only single datanode reporting that "SingleNodeContainer" exists. - List clist = - datanodeStateManager.getContainerReport(singleNodeContainerID, - ppool.getPool().getPoolName(), 1); - ppool.handleContainerReport(clist.get(0)); - - // Three nodes are going to report that ThreeNodeContainer exists. - clist = datanodeStateManager.getContainerReport(threeNodeContainerID, - ppool.getPool().getPoolName(), 3); - - for (ContainerReportsRequestProto reportsProto : clist) { - ppool.handleContainerReport(reportsProto); - } - GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4, - 200, 1000); - ppool.setDoneProcessing(); - - List> containers = ppool.filterContainer(p -> p - .getValue() == 1); - Assert.assertEquals(singleNodeContainerID, - containers.get(0).getKey().longValue()); - int count = containers.get(0).getValue(); - Assert.assertEquals(1L, count); - } - - @Test - /** - * We create three containers, Normal,OveReplicated and WayOverReplicated - * containers. This test asserts that we are able to find the - * over replicated containers. - */ - public void testDetectOverReplica() throws TimeoutException, - InterruptedException { - long normalContainerID = 9000; - long overReplicatedContainerID = 9001; - long wayOverReplicatedContainerID = 9002; - InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0); - - List clist = - datanodeStateManager.getContainerReport(normalContainerID, - ppool.getPool().getPoolName(), 3); - ppool.handleContainerReport(clist.get(0)); - - clist = datanodeStateManager.getContainerReport(overReplicatedContainerID, - ppool.getPool().getPoolName(), 4); - - for (ContainerReportsRequestProto reportsProto : clist) { - ppool.handleContainerReport(reportsProto); - } - - clist = datanodeStateManager.getContainerReport( - wayOverReplicatedContainerID, ppool.getPool().getPoolName(), 7); - - for (ContainerReportsRequestProto reportsProto : clist) { - ppool.handleContainerReport(reportsProto); - } - - // We ignore container reports from the same datanodes. - // it is possible that these each of these containers get placed - // on same datanodes, so allowing for 4 duplicates in the set of 14. - GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10, - 200, 1000); - ppool.setDoneProcessing(); - - List> containers = ppool.filterContainer(p -> p - .getValue() > 3); - Assert.assertEquals(2, containers.size()); - } - - @Test - /** - * This test verifies that all pools are picked up for replica processing. - * - */ - public void testAllPoolsAreProcessed() throws TimeoutException, - InterruptedException { - // Verify that we saw all three pools being picked up for processing. - GenericTestUtils.waitFor(() -> containerSupervisor.getPoolProcessCount() - >= 3, 200, 15 * 1000); - Assert.assertTrue(logCapturer.getOutput().contains("Pool1") && - logCapturer.getOutput().contains("Pool2") && - logCapturer.getOutput().contains("Pool3")); - } - - @Test - /** - * Adds a new pool and tests that we are able to pick up that new pool for - * processing as well as handle container reports for datanodes in that pool. - * @throws TimeoutException - * @throws InterruptedException - */ - public void testAddingNewPoolWorks() - throws TimeoutException, InterruptedException, IOException { - LogCapturer inProgressLog = LogCapturer.captureLogs( - LogFactory.getLog(InProgressPool.class)); - GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG); - try { - DatanodeDetails id = TestUtils.getDatanodeDetails(); - ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY); - poolManager.addNode("PoolNew", id); - GenericTestUtils.waitFor(() -> - logCapturer.getOutput().contains("PoolNew"), - 200, 15 * 1000); - - long newContainerID = 7001; - // Assert that we are able to send a container report to this new - // pool and datanode. - List clist = - datanodeStateManager.getContainerReport(newContainerID, - "PoolNew", 1); - containerSupervisor.handleContainerReport(clist.get(0)); - GenericTestUtils.waitFor(() -> - inProgressLog.getOutput() - .contains(Long.toString(newContainerID)) && inProgressLog - .getOutput().contains(id.getUuidString()), - 200, 10 * 1000); - } finally { - inProgressLog.stopCapturing(); - } - } -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java deleted file mode 100644 index 50fd18f5655..00000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.testutils; - -import com.google.common.primitives.Longs; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.NodePoolManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerInfo; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; - -import java.util.LinkedList; -import java.util.List; -import java.util.Random; - -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .HEALTHY; - -/** - * This class manages the state of datanode - * in conjunction with the node pool and node managers. - */ -public class ReplicationDatanodeStateManager { - private final NodeManager nodeManager; - private final NodePoolManager poolManager; - private final Random r; - - /** - * The datanode state Manager. - * - * @param nodeManager - * @param poolManager - */ - public ReplicationDatanodeStateManager(NodeManager nodeManager, - NodePoolManager poolManager) { - this.nodeManager = nodeManager; - this.poolManager = poolManager; - r = new Random(); - } - - /** - * Get Container Report as if it is from a datanode in the cluster. - * @param containerID - Container ID. - * @param poolName - Pool Name. - * @param dataNodeCount - Datanode Count. - * @return List of Container Reports. - */ - public List getContainerReport( - long containerID, String poolName, int dataNodeCount) { - List containerList = new LinkedList<>(); - List nodesInPool = poolManager.getNodes(poolName); - - if (nodesInPool == null) { - return containerList; - } - - if (nodesInPool.size() < dataNodeCount) { - throw new IllegalStateException("Not enough datanodes to create " + - "required container reports"); - } - - while (containerList.size() < dataNodeCount && nodesInPool.size() > 0) { - DatanodeDetails id = nodesInPool.get(r.nextInt(nodesInPool.size())); - nodesInPool.remove(id); - containerID++; - // We return container reports only for nodes that are healthy. - if (nodeManager.getNodeState(id) == HEALTHY) { - ContainerInfo info = ContainerInfo.newBuilder() - .setContainerID(containerID) - .setFinalhash(DigestUtils.sha256Hex( - Longs.toByteArray(containerID))) - .setContainerID(containerID) - .build(); - ContainerReportsRequestProto containerReport = - ContainerReportsRequestProto.newBuilder().addReports(info) - .setDatanodeDetails(id.getProtoBufMessage()) - .setType(ContainerReportsRequestProto.reportType.fullReport) - .build(); - containerList.add(containerReport); - } - } - return containerList; - } -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 3f814d044d5..072d8212470 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -24,13 +24,13 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodePoolManager; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.mockito.Mockito; @@ -277,12 +277,12 @@ public class ReplicationNodeManagerMock implements NodeManager { * Register the node if the node finds that it is not registered with any SCM. * * @param dd DatanodeDetailsProto - * @param nodeReport SCMNodeReport + * @param nodeReport NodeReportProto * @return SCMHeartbeatResponseProto */ @Override - public SCMCommand register(HddsProtos.DatanodeDetailsProto dd, - SCMNodeReport nodeReport) { + public RegisteredCommand register(DatanodeDetails dd, + NodeReportProto nodeReport) { return null; } @@ -294,8 +294,8 @@ public class ReplicationNodeManagerMock implements NodeManager { * @return SCMheartbeat response list */ @Override - public List sendHeartbeat(HddsProtos.DatanodeDetailsProto dd, - SCMNodeReport nodeReport) { + public List sendHeartbeat(DatanodeDetails dd, + NodeReportProto nodeReport) { return null; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index a0d41a8682f..0c1d8f2ca7f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -32,8 +32,10 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption; import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; @@ -302,12 +304,11 @@ public class TestStorageContainerManager { NodeManager nodeManager = cluster.getStorageContainerManager() .getScmNodeManager(); List commands = nodeManager.sendHeartbeat( - nodeManager.getNodes(NodeState.HEALTHY).get(0).getProtoBufMessage(), - null); + nodeManager.getNodes(NodeState.HEALTHY).get(0), null); if (commands != null) { for (SCMCommand cmd : commands) { - if (cmd.getType() == SCMCmdType.deleteBlocksCommand) { + if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) { List deletedTXs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted(); return deletedTXs != null && deletedTXs.size() == limitSize; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java index 1d19bb3c000..1dbe760c0a2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java @@ -32,8 +32,10 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerReport; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; @@ -75,11 +77,11 @@ public class TestSCMMetrics { ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes, writeBytes, readCount, writeCount); StorageContainerManager scmManager = cluster.getStorageContainerManager(); - - ContainerReportsRequestProto request = createContainerReport(numReport, - stat, null); - String fstDatanodeUuid = request.getDatanodeDetails().getUuid(); - scmManager.getDatanodeProtocolServer().sendContainerReport(request); + DatanodeDetails fstDatanodeDetails = TestUtils.getDatanodeDetails(); + ContainerReportsProto request = createContainerReport(numReport, stat); + String fstDatanodeUuid = fstDatanodeDetails.getUuidString(); + scmManager.getDatanodeProtocolServer().processContainerReports( + fstDatanodeDetails, request); // verify container stat metrics MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); @@ -100,9 +102,11 @@ public class TestSCMMetrics { getLongGauge("LastContainerReportWriteCount", scmMetrics)); // add one new report - request = createContainerReport(1, stat, null); - String sndDatanodeUuid = request.getDatanodeDetails().getUuid(); - scmManager.getDatanodeProtocolServer().sendContainerReport(request); + DatanodeDetails sndDatanodeDetails = TestUtils.getDatanodeDetails(); + request = createContainerReport(1, stat); + String sndDatanodeUuid = sndDatanodeDetails.getUuidString(); + scmManager.getDatanodeProtocolServer().processContainerReports( + sndDatanodeDetails, request); scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); assertEquals(size * (numReport + 1), @@ -124,12 +128,12 @@ public class TestSCMMetrics { // Re-send reports but with different value for validating // the aggregation. stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6); - scmManager.getDatanodeProtocolServer().sendContainerReport( - createContainerReport(1, stat, fstDatanodeUuid)); + scmManager.getDatanodeProtocolServer().processContainerReports( + fstDatanodeDetails, createContainerReport(1, stat)); stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1); - scmManager.getDatanodeProtocolServer().sendContainerReport( - createContainerReport(1, stat, sndDatanodeUuid)); + scmManager.getDatanodeProtocolServer().processContainerReports( + sndDatanodeDetails, createContainerReport(1, stat)); // the global container metrics value should be updated scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); @@ -170,11 +174,11 @@ public class TestSCMMetrics { writeBytes, readCount, writeCount); StorageContainerManager scmManager = cluster.getStorageContainerManager(); - String datanodeUuid = cluster.getHddsDatanodes().get(0) - .getDatanodeDetails().getUuidString(); - ContainerReportsRequestProto request = createContainerReport(numReport, - stat, datanodeUuid); - scmManager.getDatanodeProtocolServer().sendContainerReport(request); + DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0) + .getDatanodeDetails(); + ContainerReportsProto request = createContainerReport(numReport, stat); + scmManager.getDatanodeProtocolServer().processContainerReports( + datanodeDetails, request); MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); assertEquals(size * numReport, @@ -216,11 +220,11 @@ public class TestSCMMetrics { } } - private ContainerReportsRequestProto createContainerReport(int numReport, - ContainerStat stat, String datanodeUuid) { - StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder + private ContainerReportsProto createContainerReport(int numReport, + ContainerStat stat) { + StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder reportsBuilder = StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.newBuilder(); + .ContainerReportsProto.newBuilder(); for (int i = 0; i < numReport; i++) { ContainerReport report = new ContainerReport( @@ -234,24 +238,6 @@ public class TestSCMMetrics { report.setWriteBytes(stat.getWriteBytes().get()); reportsBuilder.addReports(report.getProtoBufMessage()); } - - DatanodeDetails datanodeDetails; - if (datanodeUuid == null) { - datanodeDetails = TestUtils.getDatanodeDetails(); - } else { - datanodeDetails = DatanodeDetails.newBuilder() - .setUuid(datanodeUuid) - .setIpAddress("127.0.0.1") - .setHostName("localhost") - .setContainerPort(0) - .setRatisPort(0) - .setOzoneRestPort(0) - .build(); - } - - reportsBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()); - reportsBuilder.setType(StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.reportType.fullReport); return reportsBuilder.build(); } }