diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/FunctionWithServiceException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/FunctionWithServiceException.java new file mode 100644 index 00000000000..b9d7bceb48f --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/FunctionWithServiceException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.function; + +import com.google.protobuf.ServiceException; + +/** + * Functional interface like java.util.function.Function but with + * checked exception. + */ +@FunctionalInterface +public interface FunctionWithServiceException { + + /** + * Applies this function to the given argument. + * + * @param t the function argument + * @return the function result + */ + R apply(T t) throws ServiceException; +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/package-info.java new file mode 100644 index 00000000000..915fe3557e2 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Functional interfaces for ozone, similar to java.util.function. + */ +package org.apache.hadoop.hdds.function; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java new file mode 100644 index 00000000000..d67a759f8d8 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server; + +import org.apache.hadoop.hdds.function.FunctionWithServiceException; +import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics; + +import com.google.protobuf.ProtocolMessageEnum; +import com.google.protobuf.ServiceException; +import io.opentracing.Scope; +import org.slf4j.Logger; + +/** + * Dispatch message after tracing and message logging for insight. + *

+ * This is a generic utility to dispatch message in ServerSide translators. + *

+ * It logs the message type/content on DEBUG/TRACING log for insight and create + * a new span based on the tracing information. + */ +public class OzoneProtocolMessageDispatcher { + + private String serviceName; + + private final ProtocolMessageMetrics protocolMessageMetrics; + + private Logger logger; + + public OzoneProtocolMessageDispatcher(String serviceName, + ProtocolMessageMetrics protocolMessageMetrics, Logger logger) { + this.serviceName = serviceName; + this.protocolMessageMetrics = protocolMessageMetrics; + this.logger = logger; + } + + public RESPONSE processRequest( + REQUEST request, + FunctionWithServiceException methodCall, + ProtocolMessageEnum type, + String traceId) throws ServiceException { + Scope scope = TracingUtil + .importAndCreateScope(type.toString(), traceId); + try { + if (logger.isTraceEnabled()) { + logger.trace( + "{} {} request is received: {}", + serviceName, + type.toString(), + request.toString().replaceAll("\n", "\\\\n")); + } else if (logger.isDebugEnabled()) { + logger.debug("{} {} request is received", + serviceName, type.toString()); + } + protocolMessageMetrics.increment(type); + + RESPONSE response = methodCall.apply(request); + + if (logger.isTraceEnabled()) { + logger.trace( + "{} {} request is processed. Response: " + + "{}", + serviceName, + type.toString(), + response.toString().replaceAll("\n", "\\\\n")); + } + return response; + + } finally { + scope.close(); + } + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java similarity index 85% rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java index bad24cffe58..b6ce067c00c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.ozone.protocolPB; +package org.apache.hadoop.hdds.scm.protocol; import java.io.IOException; import java.util.List; @@ -40,17 +40,15 @@ import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; -import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import io.opentracing.Scope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,8 +66,9 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB private static final Logger LOG = LoggerFactory .getLogger(ScmBlockLocationProtocolServerSideTranslatorPB.class); - private final ProtocolMessageMetrics - protocolMessageMetrics; + private final OzoneProtocolMessageDispatcher + dispatcher; /** * Creates a new ScmBlockLocationProtocolServerSideTranslatorPB. @@ -80,7 +79,9 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB ScmBlockLocationProtocol impl, ProtocolMessageMetrics metrics) throws IOException { this.impl = impl; - this.protocolMessageMetrics = metrics; + dispatcher = new OzoneProtocolMessageDispatcher<>( + "BlockLocationProtocol", metrics, LOG); + } private SCMBlockLocationResponse.Builder createSCMBlockResponse( @@ -94,43 +95,18 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB @Override public SCMBlockLocationResponse send(RpcController controller, SCMBlockLocationRequest request) throws ServiceException { - String traceId = request.getTraceID(); - - if (LOG.isTraceEnabled()) { - LOG.trace("BlockLocationProtocol {} request is received: {}", - request.getCmdType().toString(), - request.toString().replaceAll("\n", "\\\\n")); - - } else if (LOG.isDebugEnabled()) { - LOG.debug("BlockLocationProtocol {} request is received", - request.getCmdType().toString()); - } - - protocolMessageMetrics.increment(request.getCmdType()); - - try (Scope scope = TracingUtil - .importAndCreateScope( - "ScmBlockLocationProtocol." + request.getCmdType(), - request.getTraceID())) { - SCMBlockLocationResponse response = - processMessage(request, traceId); - - if (LOG.isTraceEnabled()) { - LOG.trace( - "BlockLocationProtocol {} request is processed. Response: " - + "{}", - request.getCmdType().toString(), - response.toString().replaceAll("\n", "\\\\n")); - } - return response; - } + return dispatcher.processRequest( + request, + this::processMessage, + request.getCmdType(), + request.getTraceID()); } private SCMBlockLocationResponse processMessage( - SCMBlockLocationRequest request, String traceId) throws ServiceException { + SCMBlockLocationRequest request) throws ServiceException { SCMBlockLocationResponse.Builder response = createSCMBlockResponse( request.getCmdType(), - traceId); + request.getTraceID()); response.setSuccess(true); response.setStatus(Status.OK); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java similarity index 81% rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 99c9e8d7c31..9d53dbf7d3b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -16,82 +16,56 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.ozone.protocolPB; +package org.apache.hadoop.hdds.scm.protocol; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import io.opentracing.Scope; +import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.InSafeModeRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.InSafeModeResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto; import org.apache.hadoop.hdds.scm.ScmInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ContainerRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ContainerResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ClosePipelineResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ListPipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ListPipelineResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.GetContainerRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.GetContainerResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.PipelineRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.PipelineResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; import org.apache.hadoop.hdds.tracing.TracingUtil; -import java.io.IOException; -import java.util.List; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import io.opentracing.Scope; /** * This class is the server-side translator that forwards requests received on diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java new file mode 100644 index 00000000000..411f22e6188 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.protocol; +/** + * RPC/protobuf specific translator classes for SCM protocol. + */ \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index 500a8cd8aab..5500891d98e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -57,7 +57,7 @@ import org.apache.hadoop.ozone.audit.SCMAction; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics; -import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB; import com.google.common.collect.Maps; import com.google.protobuf.BlockingService; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 7d9cb3e2464..e0136e81e59 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -60,8 +60,7 @@ import org.apache.hadoop.ozone.audit.AuditLoggerType; import org.apache.hadoop.ozone.audit.AuditMessage; import org.apache.hadoop.ozone.audit.Auditor; import org.apache.hadoop.ozone.audit.SCMAction; -import org.apache.hadoop.ozone.protocolPB - .StorageContainerLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java index e08fdc17ff3..d2044f59369 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java @@ -25,8 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics; -import org.apache.hadoop.ozone.protocolPB - .ScmBlockLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java index 73f151228f8..5ca0945238b 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java @@ -28,7 +28,7 @@ import org.apache.hadoop.ozone.insight.BaseInsightPoint; import org.apache.hadoop.ozone.insight.Component.Type; import org.apache.hadoop.ozone.insight.LoggerSource; import org.apache.hadoop.ozone.insight.MetricGroupDisplay; -import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB; /** * Insight metric to check the SCM block location protocol behaviour. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 6f8e9df938d..d4c029b8b3b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -17,7 +17,8 @@ package org.apache.hadoop.ozone.protocolPB; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.tracing.TracingUtil; + +import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.NotLeaderException; @@ -33,7 +34,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRespo import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import io.opentracing.Scope; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; @@ -58,8 +58,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements private final boolean isRatisEnabled; private final OzoneManager ozoneManager; private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; - private final ProtocolMessageMetrics protocolMessageMetrics; private final AtomicLong transactionIndex = new AtomicLong(0L); + private final OzoneProtocolMessageDispatcher + dispatcher; /** * Constructs an instance of the server handler. @@ -75,7 +76,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements handler = new OzoneManagerRequestHandler(impl); this.omRatisServer = ratisServer; this.isRatisEnabled = enableRatis; - this.protocolMessageMetrics = metrics; this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(), (i) -> { // Do nothing. @@ -83,6 +83,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements // As we wait until the double buffer flushes DB to disk. }, isRatisEnabled); + dispatcher = new OzoneProtocolMessageDispatcher<>("OzoneProtocol", + metrics, LOG); + } /** @@ -93,35 +96,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements @Override public OMResponse submitRequest(RpcController controller, OMRequest request) throws ServiceException { - Scope scope = TracingUtil - .importAndCreateScope(request.getCmdType().name(), - request.getTraceID()); - try { - if (LOG.isTraceEnabled()) { - LOG.trace( - "OzoneManagerProtocol {} request is received: {}", - request.getCmdType().toString(), - request.toString().replaceAll("\n", "\\\\n")); - } else if (LOG.isDebugEnabled()) { - LOG.debug("OzoneManagerProtocol {} request is received", - request.getCmdType().toString()); - } - protocolMessageMetrics.increment(request.getCmdType()); - OMResponse omResponse = processRequest(request); - - if (LOG.isTraceEnabled()) { - LOG.trace( - "OzoneManagerProtocol {} request is processed. Response: " - + "{}", - request.getCmdType().toString(), - omResponse.toString().replaceAll("\n", "\\\\n")); - } - return omResponse; - - } finally { - scope.close(); - } + return dispatcher.processRequest(request, this::processRequest, + request.getCmdType(), request.getTraceID()); } private OMResponse processRequest(OMRequest request) throws