HDDS-2067. Create generic service facade with tracing/metrics/logging support
Signed-off-by: Anu Engineer <aengineer@apache.org> Co-Authored-By: Doroszlai, Attila <6454655+adoroszlai@users.noreply.github.com>
This commit is contained in:
parent
bdaaa3bbf2
commit
f647185905
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.function;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Functional interface like java.util.function.Function but with
|
||||
* checked exception.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface FunctionWithServiceException<T, R> {
|
||||
|
||||
/**
|
||||
* Applies this function to the given argument.
|
||||
*
|
||||
* @param t the function argument
|
||||
* @return the function result
|
||||
*/
|
||||
R apply(T t) throws ServiceException;
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Functional interfaces for ozone, similar to java.util.function.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.function;
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.server;
|
||||
|
||||
import org.apache.hadoop.hdds.function.FunctionWithServiceException;
|
||||
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
|
||||
|
||||
import com.google.protobuf.ProtocolMessageEnum;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import io.opentracing.Scope;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
/**
|
||||
* Dispatch message after tracing and message logging for insight.
|
||||
* <p>
|
||||
* This is a generic utility to dispatch message in ServerSide translators.
|
||||
* <p>
|
||||
* It logs the message type/content on DEBUG/TRACING log for insight and create
|
||||
* a new span based on the tracing information.
|
||||
*/
|
||||
public class OzoneProtocolMessageDispatcher<REQUEST, RESPONSE> {
|
||||
|
||||
private String serviceName;
|
||||
|
||||
private final ProtocolMessageMetrics protocolMessageMetrics;
|
||||
|
||||
private Logger logger;
|
||||
|
||||
public OzoneProtocolMessageDispatcher(String serviceName,
|
||||
ProtocolMessageMetrics protocolMessageMetrics, Logger logger) {
|
||||
this.serviceName = serviceName;
|
||||
this.protocolMessageMetrics = protocolMessageMetrics;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public RESPONSE processRequest(
|
||||
REQUEST request,
|
||||
FunctionWithServiceException<REQUEST, RESPONSE> methodCall,
|
||||
ProtocolMessageEnum type,
|
||||
String traceId) throws ServiceException {
|
||||
Scope scope = TracingUtil
|
||||
.importAndCreateScope(type.toString(), traceId);
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(
|
||||
"{} {} request is received: <json>{}</json>",
|
||||
serviceName,
|
||||
type.toString(),
|
||||
request.toString().replaceAll("\n", "\\\\n"));
|
||||
} else if (logger.isDebugEnabled()) {
|
||||
logger.debug("{} {} request is received",
|
||||
serviceName, type.toString());
|
||||
}
|
||||
protocolMessageMetrics.increment(type);
|
||||
|
||||
RESPONSE response = methodCall.apply(request);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(
|
||||
"{} {} request is processed. Response: "
|
||||
+ "<json>{}</json>",
|
||||
serviceName,
|
||||
type.toString(),
|
||||
response.toString().replaceAll("\n", "\\\\n"));
|
||||
}
|
||||
return response;
|
||||
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.protocolPB;
|
||||
package org.apache.hadoop.hdds.scm.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -40,17 +40,15 @@ import org.apache.hadoop.hdds.scm.ScmInfo;
|
|||
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
|
||||
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import io.opentracing.Scope;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -68,8 +66,9 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(ScmBlockLocationProtocolServerSideTranslatorPB.class);
|
||||
|
||||
private final ProtocolMessageMetrics
|
||||
protocolMessageMetrics;
|
||||
private final OzoneProtocolMessageDispatcher<SCMBlockLocationRequest,
|
||||
SCMBlockLocationResponse>
|
||||
dispatcher;
|
||||
|
||||
/**
|
||||
* Creates a new ScmBlockLocationProtocolServerSideTranslatorPB.
|
||||
|
@ -80,7 +79,9 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|||
ScmBlockLocationProtocol impl,
|
||||
ProtocolMessageMetrics metrics) throws IOException {
|
||||
this.impl = impl;
|
||||
this.protocolMessageMetrics = metrics;
|
||||
dispatcher = new OzoneProtocolMessageDispatcher<>(
|
||||
"BlockLocationProtocol", metrics, LOG);
|
||||
|
||||
}
|
||||
|
||||
private SCMBlockLocationResponse.Builder createSCMBlockResponse(
|
||||
|
@ -94,43 +95,18 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|||
@Override
|
||||
public SCMBlockLocationResponse send(RpcController controller,
|
||||
SCMBlockLocationRequest request) throws ServiceException {
|
||||
String traceId = request.getTraceID();
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("BlockLocationProtocol {} request is received: <json>{}</json>",
|
||||
request.getCmdType().toString(),
|
||||
request.toString().replaceAll("\n", "\\\\n"));
|
||||
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("BlockLocationProtocol {} request is received",
|
||||
request.getCmdType().toString());
|
||||
}
|
||||
|
||||
protocolMessageMetrics.increment(request.getCmdType());
|
||||
|
||||
try (Scope scope = TracingUtil
|
||||
.importAndCreateScope(
|
||||
"ScmBlockLocationProtocol." + request.getCmdType(),
|
||||
request.getTraceID())) {
|
||||
SCMBlockLocationResponse response =
|
||||
processMessage(request, traceId);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(
|
||||
"BlockLocationProtocol {} request is processed. Response: "
|
||||
+ "<json>{}</json>",
|
||||
request.getCmdType().toString(),
|
||||
response.toString().replaceAll("\n", "\\\\n"));
|
||||
}
|
||||
return response;
|
||||
}
|
||||
return dispatcher.processRequest(
|
||||
request,
|
||||
this::processMessage,
|
||||
request.getCmdType(),
|
||||
request.getTraceID());
|
||||
}
|
||||
|
||||
private SCMBlockLocationResponse processMessage(
|
||||
SCMBlockLocationRequest request, String traceId) throws ServiceException {
|
||||
SCMBlockLocationRequest request) throws ServiceException {
|
||||
SCMBlockLocationResponse.Builder response = createSCMBlockResponse(
|
||||
request.getCmdType(),
|
||||
traceId);
|
||||
request.getTraceID());
|
||||
response.setSuccess(true);
|
||||
response.setStatus(Status.OK);
|
||||
|
|
@ -16,82 +16,56 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.protocolPB;
|
||||
package org.apache.hadoop.hdds.scm.protocol;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import io.opentracing.Scope;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
|
||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
|
||||
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ClosePipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.PipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.PipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import io.opentracing.Scope;
|
||||
|
||||
/**
|
||||
* This class is the server-side translator that forwards requests received on
|
|
@ -0,0 +1,21 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.protocol;
|
||||
/**
|
||||
* RPC/protobuf specific translator classes for SCM protocol.
|
||||
*/
|
|
@ -57,7 +57,7 @@ import org.apache.hadoop.ozone.audit.SCMAction;
|
|||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
|
||||
import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
|
|
@ -60,8 +60,7 @@ import org.apache.hadoop.ozone.audit.AuditLoggerType;
|
|||
import org.apache.hadoop.ozone.audit.AuditMessage;
|
||||
import org.apache.hadoop.ozone.audit.Auditor;
|
||||
import org.apache.hadoop.ozone.audit.SCMAction;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.StorageContainerLocationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
|
@ -25,8 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
|
|||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.ScmBlockLocationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
import org.junit.After;
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.hadoop.ozone.insight.BaseInsightPoint;
|
|||
import org.apache.hadoop.ozone.insight.Component.Type;
|
||||
import org.apache.hadoop.ozone.insight.LoggerSource;
|
||||
import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
|
||||
import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
|
||||
|
||||
/**
|
||||
* Insight metric to check the SCM block location protocol behaviour.
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
package org.apache.hadoop.ozone.protocolPB;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||
|
||||
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
|
||||
|
@ -33,7 +34,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRespo
|
|||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import io.opentracing.Scope;
|
||||
import org.apache.ratis.protocol.RaftPeerId;
|
||||
import org.apache.ratis.util.ExitUtils;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -58,8 +58,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
|
|||
private final boolean isRatisEnabled;
|
||||
private final OzoneManager ozoneManager;
|
||||
private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
|
||||
private final ProtocolMessageMetrics protocolMessageMetrics;
|
||||
private final AtomicLong transactionIndex = new AtomicLong(0L);
|
||||
private final OzoneProtocolMessageDispatcher<OMRequest, OMResponse>
|
||||
dispatcher;
|
||||
|
||||
/**
|
||||
* Constructs an instance of the server handler.
|
||||
|
@ -75,7 +76,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
|
|||
handler = new OzoneManagerRequestHandler(impl);
|
||||
this.omRatisServer = ratisServer;
|
||||
this.isRatisEnabled = enableRatis;
|
||||
this.protocolMessageMetrics = metrics;
|
||||
this.ozoneManagerDoubleBuffer =
|
||||
new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(), (i) -> {
|
||||
// Do nothing.
|
||||
|
@ -83,6 +83,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
|
|||
// As we wait until the double buffer flushes DB to disk.
|
||||
}, isRatisEnabled);
|
||||
|
||||
dispatcher = new OzoneProtocolMessageDispatcher<>("OzoneProtocol",
|
||||
metrics, LOG);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -93,35 +96,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
|
|||
@Override
|
||||
public OMResponse submitRequest(RpcController controller,
|
||||
OMRequest request) throws ServiceException {
|
||||
Scope scope = TracingUtil
|
||||
.importAndCreateScope(request.getCmdType().name(),
|
||||
request.getTraceID());
|
||||
try {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(
|
||||
"OzoneManagerProtocol {} request is received: <json>{}</json>",
|
||||
request.getCmdType().toString(),
|
||||
request.toString().replaceAll("\n", "\\\\n"));
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("OzoneManagerProtocol {} request is received",
|
||||
request.getCmdType().toString());
|
||||
}
|
||||
protocolMessageMetrics.increment(request.getCmdType());
|
||||
|
||||
OMResponse omResponse = processRequest(request);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(
|
||||
"OzoneManagerProtocol {} request is processed. Response: "
|
||||
+ "<json>{}</json>",
|
||||
request.getCmdType().toString(),
|
||||
omResponse.toString().replaceAll("\n", "\\\\n"));
|
||||
}
|
||||
return omResponse;
|
||||
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
return dispatcher.processRequest(request, this::processRequest,
|
||||
request.getCmdType(), request.getTraceID());
|
||||
}
|
||||
|
||||
private OMResponse processRequest(OMRequest request) throws
|
||||
|
|
Loading…
Reference in New Issue