From 4c24f2434dd8c09bb104ee660975855eca287fe6 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Wed, 2 Oct 2019 16:15:31 -0700 Subject: [PATCH] HDDS-2072. Make StorageContainerLocationProtocolService message based Contributed by Elek, Marton. --- ...ocationProtocolClientSideTranslatorPB.java | 411 ++++++++------- .../main/proto/ScmBlockLocationProtocol.proto | 2 +- .../StorageContainerLocationProtocol.proto | 185 ++++--- ...ocationProtocolServerSideTranslatorPB.java | 474 ++++++++++-------- .../scm/server/SCMClientProtocolServer.java | 12 +- .../ozone/insight/BaseInsightSubCommand.java | 3 + .../scm/ScmProtocolBlockLocationInsight.java | 2 +- .../ScmProtocolContainerLocationInsight.java | 71 +++ 8 files changed, 629 insertions(+), 531 deletions(-) create mode 100644 hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolContainerLocationInsight.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index ab3fcd185ff..01db597dfae 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -16,64 +16,57 @@ */ package org.apache.hadoop.hdds.scm.protocolPB; -import com.google.common.base.Preconditions; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.GetScmInfoResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest.Builder; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type; import org.apache.hadoop.hdds.scm.ScmInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ContainerRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ContainerResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.GetContainerRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.GetContainerResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.NodeQueryRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.NodeQueryResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.PipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.PipelineResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import com.google.common.base.Preconditions; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; /** * This class is the client-side translator to translate the requests made on @@ -101,14 +94,35 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB this.rpcProxy = rpcProxy; } + /** + * Helper method to wrap the request and send the message. + */ + private ScmContainerLocationResponse submitRequest( + StorageContainerLocationProtocolProtos.Type type, + Consumer builderConsumer) throws IOException { + final ScmContainerLocationResponse response; + try { + + Builder builder = ScmContainerLocationRequest.newBuilder() + .setCmdType(type) + .setTraceID(TracingUtil.exportCurrentSpan()); + builderConsumer.accept(builder); + ScmContainerLocationRequest wrapper = builder.build(); + + response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper); + } catch (ServiceException ex) { + throw ProtobufHelper.getRemoteException(ex); + } + return response; + } + /** * Asks SCM where a container should be allocated. SCM responds with the set * of datanodes that should be used creating this container. Ozone/SCM only * supports replication factor of either 1 or 3. - * @param type - Replication Type + * + * @param type - Replication Type * @param factor - Replication Count - * @return - * @throws IOException */ @Override public ContainerWithPipeline allocateContainer( @@ -122,12 +136,11 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB .setOwner(owner) .build(); - final ContainerResponseProto response; - try { - response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + ContainerResponseProto response = + submitRequest(Type.AllocateContainer, + builder -> builder.setContainerRequest(request)) + .getContainerResponse(); + //TODO should be migrated to use the top level status structure. if (response.getErrorCode() != ContainerResponseProto.Error.success) { throw new IOException(response.hasErrorMessage() ? response.getErrorMessage() : "Allocate container failed."); @@ -144,13 +157,12 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB .setContainerID(containerID) .setTraceID(TracingUtil.exportCurrentSpan()) .build(); - try { - GetContainerResponseProto response = - rpcProxy.getContainer(NULL_RPC_CONTROLLER, request); - return ContainerInfo.fromProtobuf(response.getContainerInfo()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + ScmContainerLocationResponse response = + submitRequest(Type.GetContainer, + (builder) -> builder.setGetContainerRequest(request)); + return ContainerInfo + .fromProtobuf(response.getGetContainerResponse().getContainerInfo()); + } /** @@ -164,14 +176,15 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB GetContainerWithPipelineRequestProto.newBuilder() .setTraceID(TracingUtil.exportCurrentSpan()) .setContainerID(containerID).build(); - try { - GetContainerWithPipelineResponseProto response = - rpcProxy.getContainerWithPipeline(NULL_RPC_CONTROLLER, request); - return ContainerWithPipeline.fromProtobuf( - response.getContainerWithPipeline()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + ScmContainerLocationResponse response = + submitRequest(Type.GetContainerWithPipeline, + (builder) -> builder.setGetContainerWithPipelineRequest(request)); + + return ContainerWithPipeline.fromProtobuf( + response.getGetContainerWithPipelineResponse() + .getContainerWithPipeline()); + } /** @@ -191,26 +204,22 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB builder.setTraceID(TracingUtil.exportCurrentSpan()); SCMListContainerRequestProto request = builder.build(); - try { - SCMListContainerResponseProto response = - rpcProxy.listContainer(NULL_RPC_CONTROLLER, request); - List containerList = new ArrayList<>(); - for (HddsProtos.ContainerInfoProto containerInfoProto : response - .getContainersList()) { - containerList.add(ContainerInfo.fromProtobuf(containerInfoProto)); - } - return containerList; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); + SCMListContainerResponseProto response = + submitRequest(Type.ListContainer, + builder1 -> builder1.setScmListContainerRequest(request)) + .getScmListContainerResponse(); + List containerList = new ArrayList<>(); + for (HddsProtos.ContainerInfoProto containerInfoProto : response + .getContainersList()) { + containerList.add(ContainerInfo.fromProtobuf(containerInfoProto)); } + return containerList; + } /** * Ask SCM to delete a container by name. SCM will remove * the container mapping in its database. - * - * @param containerID - * @throws IOException */ @Override public void deleteContainer(long containerID) @@ -222,18 +231,13 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB .setTraceID(TracingUtil.exportCurrentSpan()) .setContainerID(containerID) .build(); - try { - rpcProxy.deleteContainer(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + submitRequest(Type.DeleteContainer, + builder -> builder.setScmDeleteContainerRequest(request)); + } /** * Queries a list of Node Statuses. - * - * @param nodeStatuses - * @return List of Datanodes. */ @Override public List queryNode(HddsProtos.NodeState @@ -246,21 +250,18 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB .setState(nodeStatuses) .setTraceID(TracingUtil.exportCurrentSpan()) .setScope(queryScope).setPoolName(poolName).build(); - try { - NodeQueryResponseProto response = - rpcProxy.queryNode(NULL_RPC_CONTROLLER, request); - return response.getDatanodesList(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + NodeQueryResponseProto response = submitRequest(Type.QueryNode, + builder -> builder.setNodeQueryRequest(request)).getNodeQueryResponse(); + return response.getDatanodesList(); } /** * Notify from client that creates object on datanodes. - * @param type object type - * @param id object id - * @param op operation type (e.g., create, close, delete) + * + * @param type object type + * @param id object id + * @param op operation type (e.g., create, close, delete) * @param stage object creation stage : begin/complete */ @Override @@ -278,20 +279,17 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB .setOp(op) .setStage(stage) .build(); - try { - rpcProxy.notifyObjectStageChange(NULL_RPC_CONTROLLER, request); - } catch(ServiceException e){ - throw ProtobufHelper.getRemoteException(e); - } + submitRequest(Type.NotifyObjectStageChange, + builder -> builder.setObjectStageChangeRequest(request)); + } /** * Creates a replication pipeline of a specified type. * * @param replicationType - replication type - * @param factor - factor 1 or 3 - * @param nodePool - optional machine list to build a pipeline. - * @throws IOException + * @param factor - factor 1 or 3 + * @param nodePool - optional machine list to build a pipeline. */ @Override public Pipeline createReplicationPipeline(HddsProtos.ReplicationType @@ -303,87 +301,82 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB .setReplicationFactor(factor) .setReplicationType(replicationType) .build(); - try { - PipelineResponseProto response = - rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request); - if (response.getErrorCode() == - PipelineResponseProto.Error.success) { - Preconditions.checkState(response.hasPipeline(), "With success, " + - "must come a pipeline"); - return Pipeline.getFromProtobuf(response.getPipeline()); - } else { - String errorMessage = String.format("create replication pipeline " + - "failed. code : %s Message: %s", response.getErrorCode(), - response.hasErrorMessage() ? response.getErrorMessage() : ""); - throw new IOException(errorMessage); - } - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); + + PipelineResponseProto response = + submitRequest(Type.AllocatePipeline, + builder -> builder.setPipelineRequest(request)) + .getPipelineResponse(); + if (response.getErrorCode() == + PipelineResponseProto.Error.success) { + Preconditions.checkState(response.hasPipeline(), "With success, " + + "must come a pipeline"); + return Pipeline.getFromProtobuf(response.getPipeline()); + } else { + String errorMessage = String.format("create replication pipeline " + + "failed. code : %s Message: %s", response.getErrorCode(), + response.hasErrorMessage() ? response.getErrorMessage() : ""); + throw new IOException(errorMessage); } + } @Override public List listPipelines() throws IOException { - try { - ListPipelineRequestProto request = ListPipelineRequestProto - .newBuilder().setTraceID(TracingUtil.exportCurrentSpan()) - .build(); - ListPipelineResponseProto response = rpcProxy.listPipelines( - NULL_RPC_CONTROLLER, request); - List list = new ArrayList<>(); - for (HddsProtos.Pipeline pipeline : response.getPipelinesList()) { - Pipeline fromProtobuf = Pipeline.getFromProtobuf(pipeline); - list.add(fromProtobuf); - } - return list; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); + ListPipelineRequestProto request = ListPipelineRequestProto + .newBuilder().setTraceID(TracingUtil.exportCurrentSpan()) + .build(); + + ListPipelineResponseProto response = submitRequest(Type.ListPipelines, + builder -> builder.setListPipelineRequest(request)) + .getListPipelineResponse(); + + List list = new ArrayList<>(); + for (HddsProtos.Pipeline pipeline : response.getPipelinesList()) { + Pipeline fromProtobuf = Pipeline.getFromProtobuf(pipeline); + list.add(fromProtobuf); } + return list; + } @Override public void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException { - try { - ActivatePipelineRequestProto request = - ActivatePipelineRequestProto.newBuilder() - .setTraceID(TracingUtil.exportCurrentSpan()) - .setPipelineID(pipelineID) - .build(); - rpcProxy.activatePipeline(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + ActivatePipelineRequestProto request = + ActivatePipelineRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) + .setPipelineID(pipelineID) + .build(); + submitRequest(Type.ActivatePipeline, + builder -> builder.setActivatePipelineRequest(request)); + } @Override public void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException { - try { - DeactivatePipelineRequestProto request = - DeactivatePipelineRequestProto.newBuilder() - .setTraceID(TracingUtil.exportCurrentSpan()) - .setPipelineID(pipelineID) - .build(); - rpcProxy.deactivatePipeline(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + DeactivatePipelineRequestProto request = + DeactivatePipelineRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) + .setPipelineID(pipelineID) + .build(); + submitRequest(Type.DeactivatePipeline, + builder -> builder.setDeactivatePipelineRequest(request)); } @Override public void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException { - try { - ClosePipelineRequestProto request = - ClosePipelineRequestProto.newBuilder() - .setTraceID(TracingUtil.exportCurrentSpan()) - .setPipelineID(pipelineID) - .build(); - rpcProxy.closePipeline(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + ClosePipelineRequestProto request = + ClosePipelineRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) + .setPipelineID(pipelineID) + .build(); + submitRequest(Type.ClosePipeline, + builder -> builder.setClosePipelineRequest(request)); + } @Override @@ -392,16 +385,14 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB HddsProtos.GetScmInfoRequestProto.newBuilder() .setTraceID(TracingUtil.exportCurrentSpan()) .build(); - try { - HddsProtos.GetScmInfoResponseProto resp = rpcProxy.getScmInfo( - NULL_RPC_CONTROLLER, request); - ScmInfo.Builder builder = new ScmInfo.Builder() - .setClusterId(resp.getClusterId()) - .setScmId(resp.getScmId()); - return builder.build(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + GetScmInfoResponseProto resp = submitRequest(Type.GetScmInfo, + builder -> builder.setGetScmInfoRequest(request)) + .getGetScmInfoResponse(); + ScmInfo.Builder builder = new ScmInfo.Builder() + .setClusterId(resp.getClusterId()) + .setScmId(resp.getScmId()); + return builder.build(); } @@ -409,73 +400,67 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB * Check if SCM is in safe mode. * * @return Returns true if SCM is in safe mode else returns false. - * @throws IOException */ @Override public boolean inSafeMode() throws IOException { InSafeModeRequestProto request = InSafeModeRequestProto.getDefaultInstance(); - try { - InSafeModeResponseProto resp = rpcProxy.inSafeMode( - NULL_RPC_CONTROLLER, request); - return resp.getInSafeMode(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + return submitRequest(Type.InSafeMode, + builder -> builder.setInSafeModeRequest(request)) + .getInSafeModeResponse().getInSafeMode(); + } /** * Force SCM out of Safe mode. * * @return returns true if operation is successful. - * @throws IOException */ @Override public boolean forceExitSafeMode() throws IOException { ForceExitSafeModeRequestProto request = ForceExitSafeModeRequestProto.getDefaultInstance(); - try { - ForceExitSafeModeResponseProto resp = rpcProxy - .forceExitSafeMode(NULL_RPC_CONTROLLER, request); - return resp.getExitedSafeMode(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + ForceExitSafeModeResponseProto resp = + submitRequest(Type.ForceExitSafeMode, + builder -> builder.setForceExitSafeModeRequest(request)) + .getForceExitSafeModeResponse(); + + return resp.getExitedSafeMode(); + } @Override public void startReplicationManager() throws IOException { - try { - StartReplicationManagerRequestProto request = - StartReplicationManagerRequestProto.getDefaultInstance(); - rpcProxy.startReplicationManager(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + StartReplicationManagerRequestProto request = + StartReplicationManagerRequestProto.getDefaultInstance(); + submitRequest(Type.StartReplicationManager, + builder -> builder.setStartReplicationManagerRequest(request)); + } @Override public void stopReplicationManager() throws IOException { - try { - StopReplicationManagerRequestProto request = - StopReplicationManagerRequestProto.getDefaultInstance(); - rpcProxy.stopReplicationManager(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + StopReplicationManagerRequestProto request = + StopReplicationManagerRequestProto.getDefaultInstance(); + submitRequest(Type.StopReplicationManager, + builder -> builder.setStopReplicationManagerRequest(request)); + } @Override public boolean getReplicationManagerStatus() throws IOException { - try { - ReplicationManagerStatusRequestProto request = - ReplicationManagerStatusRequestProto.getDefaultInstance(); - ReplicationManagerStatusResponseProto response = - rpcProxy.getReplicationManagerStatus(NULL_RPC_CONTROLLER, request); - return response.getIsRunning(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + ReplicationManagerStatusRequestProto request = + ReplicationManagerStatusRequestProto.getDefaultInstance(); + ReplicationManagerStatusResponseProto response = + submitRequest(Type.GetReplicationManagerStatus, + builder -> builder.setSeplicationManagerStatusRequest(request)) + .getReplicationManagerStatusResponse(); + return response.getIsRunning(); + } @Override diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto index ded0d027f6b..fc7a5988ce6 100644 --- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto @@ -26,7 +26,7 @@ option java_package = "org.apache.hadoop.hdds.protocol.proto"; option java_outer_classname = "ScmBlockLocationProtocolProtos"; option java_generic_services = true; option java_generate_equals_and_hash = true; -package hadoop.hdds; +package hadoop.hdds.block; import "hdds.proto"; diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto index 0c358760360..8ea72b6cd17 100644 --- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto @@ -26,10 +26,100 @@ option java_package = "org.apache.hadoop.hdds.protocol.proto"; option java_outer_classname = "StorageContainerLocationProtocolProtos"; option java_generic_services = true; option java_generate_equals_and_hash = true; -package hadoop.hdds; +package hadoop.hdds.container; import "hdds.proto"; +/** + All functions are dispatched as Request/Response under Ozone. + if you add newe functions, please add them using same pattern. +*/ +message ScmContainerLocationRequest { + required Type cmdType = 1; // Type of the command + + // A string that identifies this command, we generate Trace ID in Ozone + // frontend and this allows us to trace that command all over ozone. + optional string traceID = 2; + + optional ContainerRequestProto containerRequest = 6; + optional GetContainerRequestProto getContainerRequest = 7; + optional GetContainerWithPipelineRequestProto getContainerWithPipelineRequest = 8; + optional SCMListContainerRequestProto scmListContainerRequest = 9; + optional SCMDeleteContainerRequestProto scmDeleteContainerRequest = 10; + optional NodeQueryRequestProto nodeQueryRequest = 11; + optional ObjectStageChangeRequestProto objectStageChangeRequest = 12; + optional PipelineRequestProto pipelineRequest = 13; + optional ListPipelineRequestProto listPipelineRequest = 14; + optional ActivatePipelineRequestProto activatePipelineRequest = 15; + optional DeactivatePipelineRequestProto deactivatePipelineRequest = 16; + optional ClosePipelineRequestProto closePipelineRequest = 17; + optional GetScmInfoRequestProto getScmInfoRequest = 18; + optional InSafeModeRequestProto inSafeModeRequest = 19; + optional ForceExitSafeModeRequestProto forceExitSafeModeRequest = 20; + optional StartReplicationManagerRequestProto startReplicationManagerRequest = 21; + optional StopReplicationManagerRequestProto stopReplicationManagerRequest = 22; + optional ReplicationManagerStatusRequestProto seplicationManagerStatusRequest = 23; + +} + +message ScmContainerLocationResponse { + required Type cmdType = 1; // Type of the command + + optional string traceID = 2; + + optional bool success = 3 [default = true]; + + optional string message = 4; + + required Status status = 5; + + optional ContainerResponseProto containerResponse = 6; + optional GetContainerResponseProto getContainerResponse = 7; + optional GetContainerWithPipelineResponseProto getContainerWithPipelineResponse = 8; + optional SCMListContainerResponseProto scmListContainerResponse = 9; + optional SCMDeleteContainerResponseProto scmDeleteContainerResponse = 10; + optional NodeQueryResponseProto nodeQueryResponse = 11; + optional ObjectStageChangeResponseProto objectStageChangeResponse = 12; + optional PipelineResponseProto pipelineResponse = 13; + optional ListPipelineResponseProto listPipelineResponse = 14; + optional ActivatePipelineResponseProto activatePipelineResponse = 15; + optional DeactivatePipelineResponseProto deactivatePipelineResponse = 16; + optional ClosePipelineResponseProto closePipelineResponse = 17; + optional GetScmInfoResponseProto getScmInfoResponse = 18; + optional InSafeModeResponseProto inSafeModeResponse = 19; + optional ForceExitSafeModeResponseProto forceExitSafeModeResponse = 20; + optional StartReplicationManagerResponseProto startReplicationManagerResponse = 21; + optional StopReplicationManagerResponseProto stopReplicationManagerResponse = 22; + optional ReplicationManagerStatusResponseProto replicationManagerStatusResponse = 23; + enum Status { + OK = 1; + CONTAINER_ALREADY_EXISTS = 2; + CONTAINER_IS_MISSING = 3; + } +} + +enum Type { + + AllocateContainer = 1; + GetContainer = 2; + GetContainerWithPipeline = 3; + ListContainer = 4; + DeleteContainer = 5; + QueryNode = 6; + NotifyObjectStageChange = 7; + AllocatePipeline = 8; + ListPipelines = 9; + ActivatePipeline = 10; + DeactivatePipeline = 11; + ClosePipeline = 12; + GetScmInfo = 13; + InSafeMode = 14; + ForceExitSafeMode = 15; + StartReplicationManager = 16; + StopReplicationManager = 17; + GetReplicationManagerStatus = 18; +} + /** * Request send to SCM asking where the container should be created. */ @@ -235,97 +325,6 @@ message ReplicationManagerStatusResponseProto { * and response messages for details of the RPC calls. */ service StorageContainerLocationProtocolService { + rpc submitRequest (ScmContainerLocationRequest) returns (ScmContainerLocationResponse); - /** - * Creates a container entry in SCM. - */ - rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto); - - /** - * Returns the pipeline for a given container. - */ - rpc getContainer(GetContainerRequestProto) returns (GetContainerResponseProto); - - /** - * Returns the pipeline for a given container. - */ - rpc getContainerWithPipeline(GetContainerWithPipelineRequestProto) returns (GetContainerWithPipelineResponseProto); - - rpc listContainer(SCMListContainerRequestProto) returns (SCMListContainerResponseProto); - - /** - * Deletes a container in SCM. - */ - rpc deleteContainer(SCMDeleteContainerRequestProto) returns (SCMDeleteContainerResponseProto); - - /** - * Returns a set of Nodes that meet a criteria. - */ - rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto); - - /** - * Notify from client when begin or finish container or pipeline operations on datanodes. - */ - rpc notifyObjectStageChange(ObjectStageChangeRequestProto) returns (ObjectStageChangeResponseProto); - - /* - * Apis that Manage Pipelines. - * - * Pipelines are abstractions offered by SCM and Datanode that allows users - * to create a replication pipeline. - * - * These following APIs allow command line programs like SCM CLI to list - * and manage pipelines. - */ - - /** - * Creates a replication pipeline. - */ - rpc allocatePipeline(PipelineRequestProto) - returns (PipelineResponseProto); - - /** - * Returns the list of Pipelines managed by SCM. - */ - rpc listPipelines(ListPipelineRequestProto) - returns (ListPipelineResponseProto); - - rpc activatePipeline(ActivatePipelineRequestProto) - returns (ActivatePipelineResponseProto); - - rpc deactivatePipeline(DeactivatePipelineRequestProto) - returns (DeactivatePipelineResponseProto); - - /** - * Closes a pipeline. - */ - rpc closePipeline(ClosePipelineRequestProto) - returns (ClosePipelineResponseProto); - - /** - * Returns information about SCM. - */ - rpc getScmInfo(GetScmInfoRequestProto) - returns (GetScmInfoResponseProto); - - /** - * Checks if SCM is in SafeMode. - */ - rpc inSafeMode(InSafeModeRequestProto) - returns (InSafeModeResponseProto); - - /** - * Returns information about SCM. - */ - rpc forceExitSafeMode(ForceExitSafeModeRequestProto) - returns (ForceExitSafeModeResponseProto); - - rpc startReplicationManager(StartReplicationManagerRequestProto) - returns (StartReplicationManagerResponseProto); - - rpc stopReplicationManager(StopReplicationManagerRequestProto) - returns (StopReplicationManagerResponseProto); - - rpc getReplicationManagerStatus(ReplicationManagerStatusRequestProto) - returns (ReplicationManagerStatusResponseProto); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 9d53dbf7d3b..0d2f4700003 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -1,4 +1,3 @@ - /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -42,16 +41,18 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse.Status; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto; @@ -61,11 +62,13 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; +import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import io.opentracing.Scope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is the server-side translator that forwards requests received on @@ -76,288 +79,315 @@ import io.opentracing.Scope; public final class StorageContainerLocationProtocolServerSideTranslatorPB implements StorageContainerLocationProtocolPB { + private static final Logger LOG = + LoggerFactory.getLogger( + StorageContainerLocationProtocolServerSideTranslatorPB.class); + private final StorageContainerLocationProtocol impl; + private OzoneProtocolMessageDispatcher + dispatcher; + /** * Creates a new StorageContainerLocationProtocolServerSideTranslatorPB. * - * @param impl {@link StorageContainerLocationProtocol} server implementation + * @param impl {@link StorageContainerLocationProtocol} server + * implementation + * @param protocolMetrics */ public StorageContainerLocationProtocolServerSideTranslatorPB( - StorageContainerLocationProtocol impl) throws IOException { + StorageContainerLocationProtocol impl, + ProtocolMessageMetrics protocolMetrics) throws IOException { this.impl = impl; + this.dispatcher = + new OzoneProtocolMessageDispatcher<>("ScmContainerLocation", + protocolMetrics, LOG); } @Override - public ContainerResponseProto allocateContainer(RpcController unused, - ContainerRequestProto request) throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("allocateContainer", request.getTraceID())) { - ContainerWithPipeline containerWithPipeline = impl - .allocateContainer(request.getReplicationType(), - request.getReplicationFactor(), request.getOwner()); - return ContainerResponseProto.newBuilder() - .setContainerWithPipeline(containerWithPipeline.getProtobuf()) - .setErrorCode(ContainerResponseProto.Error.success) - .build(); + public ScmContainerLocationResponse submitRequest(RpcController controller, + ScmContainerLocationRequest request) throws ServiceException { + return dispatcher + .processRequest(request, this::processRequest, request.getCmdType(), + request.getTraceID()); + } + + public ScmContainerLocationResponse processRequest( + ScmContainerLocationRequest request) throws ServiceException { + try { + switch (request.getCmdType()) { + case AllocateContainer: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setContainerResponse( + allocateContainer(request.getContainerRequest())) + .build(); + case GetContainer: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setGetContainerResponse( + getContainer(request.getGetContainerRequest())) + .build(); + case GetContainerWithPipeline: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setGetContainerWithPipelineResponse(getContainerWithPipeline( + request.getGetContainerWithPipelineRequest())) + .build(); + case ListContainer: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setScmListContainerResponse(listContainer( + request.getScmListContainerRequest())) + .build(); + case QueryNode: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setNodeQueryResponse(queryNode(request.getNodeQueryRequest())) + .build(); + case NotifyObjectStageChange: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setObjectStageChangeResponse(notifyObjectStageChange( + request.getObjectStageChangeRequest())) + .build(); + case ListPipelines: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setListPipelineResponse(listPipelines( + request.getListPipelineRequest())) + .build(); + case ActivatePipeline: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setActivatePipelineResponse(activatePipeline( + request.getActivatePipelineRequest())) + .build(); + case GetScmInfo: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setGetScmInfoResponse(getScmInfo( + request.getGetScmInfoRequest())) + .build(); + case InSafeMode: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setInSafeModeResponse(inSafeMode( + request.getInSafeModeRequest())) + .build(); + case ForceExitSafeMode: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setForceExitSafeModeResponse(forceExitSafeMode( + request.getForceExitSafeModeRequest())) + .build(); + case StartReplicationManager: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setStartReplicationManagerResponse(startReplicationManager( + request.getStartReplicationManagerRequest())) + .build(); + case StopReplicationManager: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setStopReplicationManagerResponse(stopReplicationManager( + request.getStopReplicationManagerRequest())) + .build(); + case GetReplicationManagerStatus: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setReplicationManagerStatusResponse(getReplicationManagerStatus( + request.getSeplicationManagerStatusRequest())) + .build(); + default: + throw new IllegalArgumentException( + "Unknown command type: " + request.getCmdType()); + } } catch (IOException e) { throw new ServiceException(e); } } - @Override + public ContainerResponseProto allocateContainer(ContainerRequestProto request) + throws IOException { + ContainerWithPipeline containerWithPipeline = impl + .allocateContainer(request.getReplicationType(), + request.getReplicationFactor(), request.getOwner()); + return ContainerResponseProto.newBuilder() + .setContainerWithPipeline(containerWithPipeline.getProtobuf()) + .setErrorCode(ContainerResponseProto.Error.success) + .build(); + + } + public GetContainerResponseProto getContainer( - RpcController controller, GetContainerRequestProto request) - throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("getContainer", request.getTraceID())) { - ContainerInfo container = impl.getContainer(request.getContainerID()); - return GetContainerResponseProto.newBuilder() - .setContainerInfo(container.getProtobuf()) - .build(); - } catch (IOException e) { - throw new ServiceException(e); - } + GetContainerRequestProto request) throws IOException { + ContainerInfo container = impl.getContainer(request.getContainerID()); + return GetContainerResponseProto.newBuilder() + .setContainerInfo(container.getProtobuf()) + .build(); } - @Override public GetContainerWithPipelineResponseProto getContainerWithPipeline( - RpcController controller, GetContainerWithPipelineRequestProto request) - throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("getContainerWithPipeline", - request.getTraceID())) { - ContainerWithPipeline container = impl - .getContainerWithPipeline(request.getContainerID()); - return GetContainerWithPipelineResponseProto.newBuilder() - .setContainerWithPipeline(container.getProtobuf()) - .build(); - } catch (IOException e) { - throw new ServiceException(e); - } + GetContainerWithPipelineRequestProto request) + throws IOException { + ContainerWithPipeline container = impl + .getContainerWithPipeline(request.getContainerID()); + return GetContainerWithPipelineResponseProto.newBuilder() + .setContainerWithPipeline(container.getProtobuf()) + .build(); } - @Override - public SCMListContainerResponseProto listContainer(RpcController controller, - SCMListContainerRequestProto request) throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("listContainer", request.getTraceID())) { - long startContainerID = 0; - int count = -1; + public SCMListContainerResponseProto listContainer( + SCMListContainerRequestProto request) throws IOException { - // Arguments check. - if (request.hasStartContainerID()) { - // End container name is given. - startContainerID = request.getStartContainerID(); - } - count = request.getCount(); - List containerList = - impl.listContainer(startContainerID, count); - SCMListContainerResponseProto.Builder builder = - SCMListContainerResponseProto.newBuilder(); - for (ContainerInfo container : containerList) { - builder.addContainers(container.getProtobuf()); - } - return builder.build(); - } catch (IOException e) { - throw new ServiceException(e); + long startContainerID = 0; + int count = -1; + + // Arguments check. + if (request.hasStartContainerID()) { + // End container name is given. + startContainerID = request.getStartContainerID(); } + count = request.getCount(); + List containerList = + impl.listContainer(startContainerID, count); + SCMListContainerResponseProto.Builder builder = + SCMListContainerResponseProto.newBuilder(); + for (ContainerInfo container : containerList) { + builder.addContainers(container.getProtobuf()); + } + return builder.build(); } - @Override public SCMDeleteContainerResponseProto deleteContainer( - RpcController controller, SCMDeleteContainerRequestProto request) - throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("deleteContainer", request.getTraceID())) { - impl.deleteContainer(request.getContainerID()); - return SCMDeleteContainerResponseProto.newBuilder().build(); - } catch (IOException e) { - throw new ServiceException(e); - } + SCMDeleteContainerRequestProto request) + throws IOException { + impl.deleteContainer(request.getContainerID()); + return SCMDeleteContainerResponseProto.newBuilder().build(); + } - @Override - public StorageContainerLocationProtocolProtos.NodeQueryResponseProto - queryNode(RpcController controller, + public NodeQueryResponseProto queryNode( StorageContainerLocationProtocolProtos.NodeQueryRequestProto request) - throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("queryNode", request.getTraceID())) { - HddsProtos.NodeState nodeState = request.getState(); - List datanodes = impl.queryNode(nodeState, - request.getScope(), request.getPoolName()); - return StorageContainerLocationProtocolProtos - .NodeQueryResponseProto.newBuilder() - .addAllDatanodes(datanodes) - .build(); - } catch (Exception e) { - throw new ServiceException(e); - } + throws IOException { + + HddsProtos.NodeState nodeState = request.getState(); + List datanodes = impl.queryNode(nodeState, + request.getScope(), request.getPoolName()); + return NodeQueryResponseProto.newBuilder() + .addAllDatanodes(datanodes) + .build(); + } - @Override public ObjectStageChangeResponseProto notifyObjectStageChange( - RpcController controller, ObjectStageChangeRequestProto request) - throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("notifyObjectStageChange", - request.getTraceID())) { - impl.notifyObjectStageChange(request.getType(), request.getId(), - request.getOp(), request.getStage()); - return ObjectStageChangeResponseProto.newBuilder().build(); - } catch (IOException e) { - throw new ServiceException(e); - } + ObjectStageChangeRequestProto request) + throws IOException { + impl.notifyObjectStageChange(request.getType(), request.getId(), + request.getOp(), request.getStage()); + return ObjectStageChangeResponseProto.newBuilder().build(); } - @Override - public PipelineResponseProto allocatePipeline( - RpcController controller, PipelineRequestProto request) - throws ServiceException { - // TODO : Wiring this up requires one more patch. - return null; - } - - @Override public ListPipelineResponseProto listPipelines( - RpcController controller, ListPipelineRequestProto request) - throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("listPipelines", request.getTraceID())) { - ListPipelineResponseProto.Builder builder = ListPipelineResponseProto - .newBuilder(); - List pipelines = impl.listPipelines(); - for (Pipeline pipeline : pipelines) { - HddsProtos.Pipeline protobufMessage = pipeline.getProtobufMessage(); - builder.addPipelines(protobufMessage); - } - return builder.build(); - } catch (IOException e) { - throw new ServiceException(e); + ListPipelineRequestProto request) + throws IOException { + ListPipelineResponseProto.Builder builder = ListPipelineResponseProto + .newBuilder(); + List pipelines = impl.listPipelines(); + for (Pipeline pipeline : pipelines) { + HddsProtos.Pipeline protobufMessage = pipeline.getProtobufMessage(); + builder.addPipelines(protobufMessage); } + return builder.build(); } - @Override public ActivatePipelineResponseProto activatePipeline( - RpcController controller, ActivatePipelineRequestProto request) - throws ServiceException { - try (Scope ignored = TracingUtil - .importAndCreateScope("activatePipeline", request.getTraceID())) { - impl.activatePipeline(request.getPipelineID()); - return ActivatePipelineResponseProto.newBuilder().build(); - } catch (IOException e) { - throw new ServiceException(e); - } + ActivatePipelineRequestProto request) + throws IOException { + impl.activatePipeline(request.getPipelineID()); + return ActivatePipelineResponseProto.newBuilder().build(); } - @Override public DeactivatePipelineResponseProto deactivatePipeline( - RpcController controller, DeactivatePipelineRequestProto request) - throws ServiceException { - try (Scope ignored = TracingUtil - .importAndCreateScope("deactivatePipeline", request.getTraceID())) { - impl.deactivatePipeline(request.getPipelineID()); - return DeactivatePipelineResponseProto.newBuilder().build(); - } catch (IOException e) { - throw new ServiceException(e); - } + DeactivatePipelineRequestProto request) + throws IOException { + impl.deactivatePipeline(request.getPipelineID()); + return DeactivatePipelineResponseProto.newBuilder().build(); } - @Override public ClosePipelineResponseProto closePipeline( RpcController controller, ClosePipelineRequestProto request) - throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("closePipeline", request.getTraceID())) { - impl.closePipeline(request.getPipelineID()); - return ClosePipelineResponseProto.newBuilder().build(); - } catch (IOException e) { - throw new ServiceException(e); - } + throws IOException { + + impl.closePipeline(request.getPipelineID()); + return ClosePipelineResponseProto.newBuilder().build(); + } - @Override public HddsProtos.GetScmInfoResponseProto getScmInfo( - RpcController controller, HddsProtos.GetScmInfoRequestProto req) - throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("getScmInfo", req.getTraceID())) { - ScmInfo scmInfo = impl.getScmInfo(); - return HddsProtos.GetScmInfoResponseProto.newBuilder() - .setClusterId(scmInfo.getClusterId()) - .setScmId(scmInfo.getScmId()) - .build(); - } catch (IOException ex) { - throw new ServiceException(ex); - } + HddsProtos.GetScmInfoRequestProto req) + throws IOException { + ScmInfo scmInfo = impl.getScmInfo(); + return HddsProtos.GetScmInfoResponseProto.newBuilder() + .setClusterId(scmInfo.getClusterId()) + .setScmId(scmInfo.getScmId()) + .build(); } - @Override public InSafeModeResponseProto inSafeMode( - RpcController controller, - InSafeModeRequestProto request) throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("inSafeMode", request.getTraceID())) { - return InSafeModeResponseProto.newBuilder() - .setInSafeMode(impl.inSafeMode()).build(); - } catch (IOException ex) { - throw new ServiceException(ex); - } + InSafeModeRequestProto request) throws IOException { + + return InSafeModeResponseProto.newBuilder() + .setInSafeMode(impl.inSafeMode()).build(); + } - @Override public ForceExitSafeModeResponseProto forceExitSafeMode( - RpcController controller, ForceExitSafeModeRequestProto request) - throws ServiceException { - try (Scope scope = TracingUtil - .importAndCreateScope("forceExitSafeMode", request.getTraceID())) { - return ForceExitSafeModeResponseProto.newBuilder() - .setExitedSafeMode(impl.forceExitSafeMode()).build(); - } catch (IOException ex) { - throw new ServiceException(ex); - } + ForceExitSafeModeRequestProto request) + throws IOException { + return ForceExitSafeModeResponseProto.newBuilder() + .setExitedSafeMode(impl.forceExitSafeMode()).build(); + } - @Override public StartReplicationManagerResponseProto startReplicationManager( - RpcController controller, StartReplicationManagerRequestProto request) - throws ServiceException { - try (Scope ignored = TracingUtil.importAndCreateScope( - "startReplicationManager", request.getTraceID())) { - impl.startReplicationManager(); - return StartReplicationManagerResponseProto.newBuilder().build(); - } catch (IOException ex) { - throw new ServiceException(ex); - } + StartReplicationManagerRequestProto request) + throws IOException { + impl.startReplicationManager(); + return StartReplicationManagerResponseProto.newBuilder().build(); } - @Override public StopReplicationManagerResponseProto stopReplicationManager( - RpcController controller, StopReplicationManagerRequestProto request) - throws ServiceException { - try (Scope ignored = TracingUtil.importAndCreateScope( - "stopReplicationManager", request.getTraceID())) { - impl.stopReplicationManager(); - return StopReplicationManagerResponseProto.newBuilder().build(); - } catch (IOException ex) { - throw new ServiceException(ex); - } + StopReplicationManagerRequestProto request) + throws IOException { + impl.stopReplicationManager(); + return StopReplicationManagerResponseProto.newBuilder().build(); + } - @Override public ReplicationManagerStatusResponseProto getReplicationManagerStatus( - RpcController controller, ReplicationManagerStatusRequestProto request) - throws ServiceException { - try (Scope ignored = TracingUtil.importAndCreateScope( - "getReplicationManagerStatus", request.getTraceID())) { - return ReplicationManagerStatusResponseProto.newBuilder() - .setIsRunning(impl.getReplicationManagerStatus()).build(); - } catch (IOException ex) { - throw new ServiceException(ex); - } + ReplicationManagerStatusRequestProto request) + throws IOException { + return ReplicationManagerStatusResponseProto.newBuilder() + .setIsRunning(impl.getReplicationManagerStatus()).build(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index e0136e81e59..9c27f6a64d6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.audit.AuditMessage; import org.apache.hadoop.ozone.audit.Auditor; import org.apache.hadoop.ozone.audit.SCMAction; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,6 +104,7 @@ public class SCMClientProtocolServer implements private final StorageContainerManager scm; private final OzoneConfiguration conf; private SafeModePrecheck safeModePrecheck; + private final ProtocolMessageMetrics protocolMetrics; public SCMClientProtocolServer(OzoneConfiguration conf, StorageContainerManager scm) throws IOException { @@ -115,10 +117,16 @@ public class SCMClientProtocolServer implements RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, ProtobufRpcEngine.class); + protocolMetrics = ProtocolMessageMetrics + .create("ScmContainerLocationProtocol", + "SCM ContainerLocation protocol metrics", + StorageContainerLocationProtocolProtos.Type.values()); + // SCM Container Service RPC BlockingService storageProtoPbService = newReflectiveBlockingService( - new StorageContainerLocationProtocolServerSideTranslatorPB(this)); + new StorageContainerLocationProtocolServerSideTranslatorPB(this, + protocolMetrics)); final InetSocketAddress scmAddress = HddsServerUtil .getScmClientBindAddress(conf); @@ -147,6 +155,7 @@ public class SCMClientProtocolServer implements } public void start() { + protocolMetrics.register(); LOG.info( StorageContainerManager.buildRpcServerStartMessage( "RPC server for Client ", getClientRpcAddress())); @@ -154,6 +163,7 @@ public class SCMClientProtocolServer implements } public void stop() { + protocolMetrics.unregister(); try { LOG.info("Stopping the RPC server for Client Protocol"); getClientRpcServer().stop(); diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java index a9f4b949f62..4c3875c3ac0 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java @@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.insight.scm.EventQueueInsight; import org.apache.hadoop.ozone.insight.scm.NodeManagerInsight; import org.apache.hadoop.ozone.insight.scm.ReplicaManagerInsight; import org.apache.hadoop.ozone.insight.scm.ScmProtocolBlockLocationInsight; +import org.apache.hadoop.ozone.insight.scm.ScmProtocolContainerLocationInsight; import org.apache.hadoop.ozone.insight.scm.ScmProtocolSecurityInsight; import org.apache.hadoop.ozone.om.OMConfigKeys; @@ -89,6 +90,8 @@ public class BaseInsightSubCommand { insights.put("scm.event-queue", new EventQueueInsight()); insights.put("scm.protocol.block-location", new ScmProtocolBlockLocationInsight()); + insights.put("scm.protocol.container-location", + new ScmProtocolContainerLocationInsight()); insights.put("scm.protocol.security", new ScmProtocolSecurityInsight()); insights.put("om.key-manager", new KeyManagerInsight()); diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java index 31c73c04435..f67f64194b3 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java @@ -23,12 +23,12 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer; import org.apache.hadoop.ozone.insight.BaseInsightPoint; import org.apache.hadoop.ozone.insight.Component.Type; import org.apache.hadoop.ozone.insight.LoggerSource; import org.apache.hadoop.ozone.insight.MetricGroupDisplay; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB; /** * Insight metric to check the SCM block location protocol behaviour. diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolContainerLocationInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolContainerLocationInsight.java new file mode 100644 index 00000000000..d6db589ed82 --- /dev/null +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolContainerLocationInsight.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.insight.scm; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.insight.BaseInsightPoint; +import org.apache.hadoop.ozone.insight.Component.Type; +import org.apache.hadoop.ozone.insight.LoggerSource; +import org.apache.hadoop.ozone.insight.MetricGroupDisplay; + +/** + * Insight metric to check the SCM block location protocol behaviour. + */ +public class ScmProtocolContainerLocationInsight extends BaseInsightPoint { + + @Override + public List getRelatedLoggers(boolean verbose) { + List loggers = new ArrayList<>(); + loggers.add( + new LoggerSource(Type.SCM, + StorageContainerLocationProtocolServerSideTranslatorPB.class, + defaultLevel(verbose))); + new LoggerSource(Type.SCM, + StorageContainerLocationProtocolService.class, + defaultLevel(verbose)); + return loggers; + } + + @Override + public List getMetrics() { + List metrics = new ArrayList<>(); + + Map filter = new HashMap<>(); + filter.put("servername", "StorageContainerLocationProtocolService"); + + addRpcMetrics(metrics, Type.SCM, filter); + + addProtocolMessageMetrics(metrics, "scm_container_location_protocol", + Type.SCM, StorageContainerLocationProtocolProtos.Type.values()); + + return metrics; + } + + @Override + public String getDescription() { + return "SCM Container location protocol endpoint"; + } + +}