From e8ae632d4c4f13788b0c42dbf297c8f7b9d889f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Mon, 23 Sep 2019 16:40:08 +0200 Subject: [PATCH] HDDS-2068. Make StorageContainerDatanodeProtocolService message based Signed-off-by: Anu Engineer --- ...atanodeProtocolClientSideTranslatorPB.java | 60 +++++---- ...atanodeProtocolServerSideTranslatorPB.java | 121 ++++++++++-------- .../StorageContainerDatanodeProtocol.proto | 58 ++++++--- .../ozone/container/common/SCMTestUtils.java | 4 +- .../scm/server/SCMDatanodeProtocolServer.java | 102 ++++++--------- .../ozone/insight/BaseInsightSubCommand.java | 4 +- .../scm/ScmProtocolBlockLocationInsight.java | 6 +- .../scm/ScmProtocolDatanodeInsight.java | 72 +++++++++++ 8 files changed, 268 insertions(+), 159 deletions(-) create mode 100644 hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolDatanodeInsight.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index 4e1e27e1801..9b446666e5d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest.Builder; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeResponse; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -38,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; @@ -45,6 +49,7 @@ import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import java.io.Closeable; import java.io.IOException; +import java.util.function.Consumer; /** * This class is the client-side translator to translate the requests made on @@ -96,6 +101,25 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB return rpcProxy; } + /** + * Helper method to wrap the request and send the message. + */ + private SCMDatanodeResponse submitRequest(Type type, + Consumer builderConsumer) throws IOException { + final SCMDatanodeResponse response; + try { + Builder builder = SCMDatanodeRequest.newBuilder() + .setCmdType(type); + builderConsumer.accept(builder); + SCMDatanodeRequest wrapper = builder.build(); + + response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper); + } catch (ServiceException ex) { + throw ProtobufHelper.getRemoteException(ex); + } + return response; + } + /** * Returns SCM version. * @@ -104,16 +128,11 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB */ @Override public SCMVersionResponseProto getVersion(SCMVersionRequestProto - unused) throws IOException { - SCMVersionRequestProto request = - SCMVersionRequestProto.newBuilder().build(); - final SCMVersionResponseProto response; - try { - response = rpcProxy.getVersion(NULL_RPC_CONTROLLER, request); - } catch (ServiceException ex) { - throw ProtobufHelper.getRemoteException(ex); - } - return response; + request) throws IOException { + return submitRequest(Type.GetVersion, + (builder) -> builder + .setGetVersionRequest(SCMVersionRequestProto.newBuilder().build())) + .getGetVersionResponse(); } /** @@ -126,13 +145,9 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB @Override public SCMHeartbeatResponseProto sendHeartbeat( SCMHeartbeatRequestProto heartbeat) throws IOException { - final SCMHeartbeatResponseProto resp; - try { - resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, heartbeat); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - return resp; + return submitRequest(Type.SendHeartbeat, + (builder) -> builder.setSendHeartbeatRequest(heartbeat)) + .getSendHeartbeatResponse(); } /** @@ -155,13 +170,8 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB req.setContainerReport(containerReportsRequestProto); req.setPipelineReports(pipelineReportsProto); req.setNodeReport(nodeReport); - final SCMRegisteredResponseProto response; - try { - response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - return response; + return submitRequest(Type.Register, + (builder) -> builder.setRegisterRequest(req)) + .getRegisterResponse(); } - } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 862233276aa..ed704ebf431 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -16,29 +16,24 @@ */ package org.apache.hadoop.ozone.protocolPB; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import java.io.IOException; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeResponse; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Status; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; -import java.io.IOException; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is the server-side translator that forwards requests received on @@ -48,47 +43,71 @@ import java.io.IOException; public class StorageContainerDatanodeProtocolServerSideTranslatorPB implements StorageContainerDatanodeProtocolPB { + private static final Logger LOG = LoggerFactory + .getLogger(StorageContainerDatanodeProtocolServerSideTranslatorPB.class); + private final StorageContainerDatanodeProtocol impl; + private final OzoneProtocolMessageDispatcher dispatcher; public StorageContainerDatanodeProtocolServerSideTranslatorPB( - StorageContainerDatanodeProtocol impl) { + StorageContainerDatanodeProtocol impl, + ProtocolMessageMetrics protocolMessageMetrics) { this.impl = impl; + dispatcher = + new OzoneProtocolMessageDispatcher<>("SCMDatanodeProtocol", + protocolMessageMetrics, + LOG); + } + + public SCMRegisteredResponseProto register( + SCMRegisterRequestProto request) throws IOException { + ContainerReportsProto containerRequestProto = request + .getContainerReport(); + NodeReportProto dnNodeReport = request.getNodeReport(); + PipelineReportsProto pipelineReport = request.getPipelineReports(); + return impl.register(request.getDatanodeDetails(), dnNodeReport, + containerRequestProto, pipelineReport); + } @Override - public SCMVersionResponseProto getVersion(RpcController controller, - SCMVersionRequestProto request) + public SCMDatanodeResponse submitRequest(RpcController controller, + SCMDatanodeRequest request) throws ServiceException { + return dispatcher.processRequest(request, this::processMessage, + request.getCmdType(), request.getTraceID()); + } + + public SCMDatanodeResponse processMessage(SCMDatanodeRequest request) throws ServiceException { try { - return impl.getVersion(request); + Type cmdType = request.getCmdType(); + switch (cmdType) { + case GetVersion: + return SCMDatanodeResponse.newBuilder() + .setCmdType(cmdType) + .setStatus(Status.OK) + .setGetVersionResponse( + impl.getVersion(request.getGetVersionRequest())) + .build(); + case SendHeartbeat: + return SCMDatanodeResponse.newBuilder() + .setCmdType(cmdType) + .setStatus(Status.OK) + .setSendHeartbeatResponse( + impl.sendHeartbeat(request.getSendHeartbeatRequest())) + .build(); + case Register: + return SCMDatanodeResponse.newBuilder() + .setCmdType(cmdType) + .setStatus(Status.OK) + .setRegisterResponse(register(request.getRegisterRequest())) + .build(); + default: + throw new ServiceException("Unknown command type: " + cmdType); + } } catch (IOException e) { throw new ServiceException(e); } } - - @Override - public SCMRegisteredResponseProto register(RpcController controller, - SCMRegisterRequestProto request) throws ServiceException { - try { - ContainerReportsProto containerRequestProto = request - .getContainerReport(); - NodeReportProto dnNodeReport = request.getNodeReport(); - PipelineReportsProto pipelineReport = request.getPipelineReports(); - return impl.register(request.getDatanodeDetails(), dnNodeReport, - containerRequestProto, pipelineReport); - } catch (IOException e) { - throw new ServiceException(e); - } - } - - @Override - public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller, - SCMHeartbeatRequestProto request) throws ServiceException { - try { - return impl.sendHeartbeat(request); - } catch (IOException e) { - throw new ServiceException(e); - } - } - } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 1d09dfa902b..a975cd5605f 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -34,6 +34,45 @@ package hadoop.hdds; import "hdds.proto"; + +message SCMDatanodeRequest { + required Type cmdType = 1; // Type of the command + + optional string traceID = 2; + + optional SCMVersionRequestProto getVersionRequest = 3; + optional SCMRegisterRequestProto registerRequest = 4; + optional SCMHeartbeatRequestProto sendHeartbeatRequest = 5; +} + +message SCMDatanodeResponse { + required Type cmdType = 1; // Type of the command + + optional string traceID = 2; + + optional bool success = 3 [default = true]; + + optional string message = 4; + + required Status status = 5; + + optional SCMVersionResponseProto getVersionResponse = 6; + optional SCMRegisteredResponseProto registerResponse = 7; + optional SCMHeartbeatResponseProto sendHeartbeatResponse = 8; + +} + +enum Type { + GetVersion = 1; + Register = 2; + SendHeartbeat = 3; +} + +enum Status { + OK = 1; + ERROR = 2; +} + /** * Request for version info of the software stack on the server. */ @@ -385,21 +424,6 @@ message ReplicateContainerCommandProto { */ service StorageContainerDatanodeProtocolService { - /** - * Gets the version information from the SCM. - */ - rpc getVersion (SCMVersionRequestProto) returns (SCMVersionResponseProto); - - /** - * Registers a data node with SCM. - */ - rpc register (SCMRegisterRequestProto) returns (SCMRegisteredResponseProto); - - /** - * Send heartbeat from datanode to SCM. HB's under SCM looks more - * like life line protocol than HB's under HDFS. In other words, it is - * extremely light weight and contains no data payload. - */ - rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto); - + //Message sent from Datanode to SCM as a heartbeat. + rpc submitRequest (SCMDatanodeRequest) returns (SCMDatanodeResponse); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java index 514c8224bca..5a7c30ca68f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java @@ -29,12 +29,14 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.test.GenericTestUtils; import com.google.protobuf.BlockingService; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; +import org.mockito.Mockito; /** * Test Endpoint class. @@ -91,7 +93,7 @@ public final class SCMTestUtils { StorageContainerDatanodeProtocolService. newReflectiveBlockingService( new StorageContainerDatanodeProtocolServerSideTranslatorPB( - server)); + server, Mockito.mock(ProtocolMessageMetrics.class))); RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss, StorageContainerDatanodeProtocolPB.class, scmDatanodeService, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 6dd9dab827f..530c0a6d238 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -21,61 +21,32 @@ */ package org.apache.hadoop.hdds.scm.server; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.protobuf.BlockingService; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; - -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto - .Type.closeContainerCommand; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto - .Type.deleteBlocksCommand; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto - .Type.deleteContainerCommand; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type - .replicateContainerCommand; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto - .Type.reregisterCommand; - - - +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReregisterCommandProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .ReportFromDatanode; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .PipelineReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -95,27 +66,28 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; -import org.apache.hadoop.ozone.protocolPB - .StorageContainerDatanodeProtocolServerSideTranslatorPB; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.protobuf.BlockingService; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.closeContainerCommand; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY; - import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT; import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer; import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Protocol Handler for Datanode Protocol. @@ -138,6 +110,7 @@ public class SCMDatanodeProtocolServer implements private final InetSocketAddress datanodeRpcAddress; private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher; private final EventPublisher eventPublisher; + private final ProtocolMessageMetrics protocolMessageMetrics; public SCMDatanodeProtocolServer(final OzoneConfiguration conf, StorageContainerManager scm, EventPublisher eventPublisher) @@ -157,12 +130,17 @@ public class SCMDatanodeProtocolServer implements RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, ProtobufRpcEngine.class); + + protocolMessageMetrics = ProtocolMessageMetrics + .create("SCMDatanodeProtocol", "SCM Datanode protocol", + StorageContainerDatanodeProtocolProtos.Type.values()); + BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos .StorageContainerDatanodeProtocolService .newReflectiveBlockingService( new StorageContainerDatanodeProtocolServerSideTranslatorPB( - this)); + this, protocolMessageMetrics)); InetSocketAddress datanodeRpcAddr = HddsServerUtil.getScmDataNodeBindAddress(conf); @@ -191,6 +169,7 @@ public class SCMDatanodeProtocolServer implements LOG.info( StorageContainerManager.buildRpcServerStartMessage( "RPC server for DataNodes", datanodeRpcAddress)); + protocolMessageMetrics.register(); datanodeRpcServer.start(); } @@ -370,6 +349,7 @@ public class SCMDatanodeProtocolServer implements LOG.error(" datanodeRpcServer stop failed.", ex); } IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); + protocolMessageMetrics.unregister(); } @Override diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java index 95cda4168c5..9a6b0108c27 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java @@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.insight.scm.EventQueueInsight; import org.apache.hadoop.ozone.insight.scm.NodeManagerInsight; import org.apache.hadoop.ozone.insight.scm.ReplicaManagerInsight; import org.apache.hadoop.ozone.insight.scm.ScmProtocolBlockLocationInsight; +import org.apache.hadoop.ozone.insight.scm.ScmProtocolDatanodeInsight; import org.apache.hadoop.ozone.om.OMConfigKeys; import picocli.CommandLine; @@ -88,7 +89,8 @@ public class BaseInsightSubCommand { insights.put("scm.event-queue", new EventQueueInsight()); insights.put("scm.protocol.block-location", new ScmProtocolBlockLocationInsight()); - + insights.put("scm.protocol.datanode", + new ScmProtocolDatanodeInsight()); insights.put("om.key-manager", new KeyManagerInsight()); insights.put("om.protocol.client", new OmProtocolInsight()); diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java index 5ca0945238b..31c73c04435 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java @@ -23,12 +23,12 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer; import org.apache.hadoop.ozone.insight.BaseInsightPoint; import org.apache.hadoop.ozone.insight.Component.Type; import org.apache.hadoop.ozone.insight.LoggerSource; import org.apache.hadoop.ozone.insight.MetricGroupDisplay; -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB; /** * Insight metric to check the SCM block location protocol behaviour. @@ -42,9 +42,9 @@ public class ScmProtocolBlockLocationInsight extends BaseInsightPoint { new LoggerSource(Type.SCM, ScmBlockLocationProtocolServerSideTranslatorPB.class, defaultLevel(verbose))); - new LoggerSource(Type.SCM, + loggers.add(new LoggerSource(Type.SCM, SCMBlockProtocolServer.class, - defaultLevel(verbose)); + defaultLevel(verbose))); return loggers; } diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolDatanodeInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolDatanodeInsight.java new file mode 100644 index 00000000000..289af89a7c5 --- /dev/null +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolDatanodeInsight.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.insight.scm; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer; +import org.apache.hadoop.ozone.insight.BaseInsightPoint; +import org.apache.hadoop.ozone.insight.Component.Type; +import org.apache.hadoop.ozone.insight.LoggerSource; +import org.apache.hadoop.ozone.insight.MetricGroupDisplay; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; + +/** + * Insight metric to check the SCM datanode protocol behaviour. + */ +public class ScmProtocolDatanodeInsight extends BaseInsightPoint { + + @Override + public List getRelatedLoggers(boolean verbose) { + List loggers = new ArrayList<>(); + loggers.add( + new LoggerSource(Type.SCM, + SCMDatanodeProtocolServer.class, + defaultLevel(verbose))); + loggers.add( + new LoggerSource(Type.SCM, + StorageContainerDatanodeProtocolServerSideTranslatorPB.class, + defaultLevel(verbose))); + return loggers; + } + + @Override + public List getMetrics() { + List metrics = new ArrayList<>(); + + Map filter = new HashMap<>(); + filter.put("servername", "StorageContainerDatanodeProtocolService"); + + addRpcMetrics(metrics, Type.SCM, filter); + + addProtocolMessageMetrics(metrics, "scm_datanode_protocol", + Type.SCM, StorageContainerDatanodeProtocolProtos.Type.values()); + + return metrics; + } + + @Override + public String getDescription() { + return "SCM Datanode protocol endpoint"; + } + +}