From e7d1ae52d24eef7ea471602f1489e7123a1337c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Mon, 11 Feb 2019 12:11:54 +0100 Subject: [PATCH] HDDS-1017. Use distributed tracing to indentify performance problems in Ozone. Contributed by Elek, Marton. --- .../hadoop/hdds/scm/XceiverClientRatis.java | 12 +- hadoop-hdds/common/pom.xml | 10 + ...ocationProtocolClientSideTranslatorPB.java | 19 +- .../hdds/tracing/GrpcClientInterceptor.java | 57 ++++ .../hdds/tracing/GrpcServerInterceptor.java | 51 ++++ .../hadoop/hdds/tracing/StringCodec.java | 89 +++++++ .../hadoop/hdds/tracing/TraceAllMethod.java | 86 ++++++ .../hadoop/hdds/tracing/TracingUtil.java | 112 ++++++++ .../hadoop/hdds/tracing/package-info.java | 23 ++ ...ocationProtocolServerSideTranslatorPB.java | 41 ++- .../java/org/apache/ratis/RatisHelper.java | 54 ++-- .../StorageContainerLocationProtocol.proto | 19 +- hadoop-hdds/common/src/main/proto/hdds.proto | 1 + .../hadoop/ozone/HddsDatanodeService.java | 5 + .../transport/server/XceiverServerGrpc.java | 6 +- .../server/ratis/ContainerStateMachine.java | 106 ++++---- .../server/ratis/XceiverServerRatis.java | 25 +- hadoop-hdds/pom.xml | 6 + .../scm/server/StorageContainerManager.java | 2 + .../hadoop/ozone/client/ObjectStore.java | 3 +- ...ManagerProtocolClientSideTranslatorPB.java | 250 +++++++----------- .../compose/ozonetrace/docker-compose.yaml | 65 +++++ .../src/main/compose/ozonetrace/docker-config | 84 ++++++ hadoop-ozone/objectstore-service/pom.xml | 6 + .../apache/hadoop/ozone/om/OzoneManager.java | 2 + ...ManagerProtocolServerSideTranslatorPB.java | 28 +- .../hadoop/ozone/web/ozShell/Shell.java | 13 +- hadoop-ozone/ozonefs/pom.xml | 5 - hadoop-ozone/pom.xml | 4 - hadoop-ozone/tools/pom.xml | 5 - .../org/apache/hadoop/ozone/freon/Freon.java | 7 + .../ozone/freon/RandomKeyGenerator.java | 58 ++-- 32 files changed, 945 insertions(+), 309 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/GrpcClientInterceptor.java create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/GrpcServerInterceptor.java create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/StringCodec.java create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TraceAllMethod.java create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/package-info.java create mode 100644 hadoop-ozone/dist/src/main/compose/ozonetrace/docker-compose.yaml create mode 100644 hadoop-ozone/dist/src/main/compose/ozonetrace/docker-config diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index c697b098aab..f68b28f9114 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.tracing.TracingUtil; + import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftClientReply; @@ -189,9 +191,13 @@ private RaftClient getClient() { private CompletableFuture sendRequestAsync( ContainerCommandRequestProto request) { - boolean isReadOnlyRequest = HddsUtils.isReadOnly(request); - ByteString byteString = request.toByteString(); - LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request); + ContainerCommandRequestProto finalPayload = + ContainerCommandRequestProto.newBuilder(request) + .setTraceID(TracingUtil.exportCurrentSpan()) + .build(); + boolean isReadOnlyRequest = HddsUtils.isReadOnly(finalPayload); + ByteString byteString = finalPayload.toByteString(); + LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, finalPayload); return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) : getClient().sendAsync(() -> byteString); } diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml index cb34d27558d..218ef17a733 100644 --- a/hadoop-hdds/common/pom.xml +++ b/hadoop-hdds/common/pom.xml @@ -127,6 +127,16 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> org.junit.jupiter junit-jupiter-api + + io.jaegertracing + jaeger-client + 0.33.1 + + + io.opentracing + opentracing-util + 0.31.0 + diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 44f86948dbc..4a1442e1e74 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -59,6 +59,7 @@ .StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; @@ -109,6 +110,7 @@ public ContainerWithPipeline allocateContainer( String owner) throws IOException { ContainerRequestProto request = ContainerRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) .setReplicationFactor(factor) .setReplicationType(type) .setOwner(owner) @@ -134,6 +136,7 @@ public ContainerInfo getContainer(long containerID) throws IOException { GetContainerRequestProto request = GetContainerRequestProto .newBuilder() .setContainerID(containerID) + .setTraceID(TracingUtil.exportCurrentSpan()) .build(); try { GetContainerResponseProto response = @@ -153,6 +156,7 @@ public ContainerWithPipeline getContainerWithPipeline(long containerID) "Container ID cannot be negative"); GetContainerWithPipelineRequestProto request = GetContainerWithPipelineRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) .setContainerID(containerID).build(); try { GetContainerWithPipelineResponseProto response = @@ -178,6 +182,7 @@ public List listContainer(long startContainerID, int count) .newBuilder(); builder.setStartContainerID(startContainerID); builder.setCount(count); + builder.setTraceID(TracingUtil.exportCurrentSpan()); SCMListContainerRequestProto request = builder.build(); try { @@ -208,6 +213,7 @@ public void deleteContainer(long containerID) "Container ID cannot be negative"); SCMDeleteContainerRequestProto request = SCMDeleteContainerRequestProto .newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) .setContainerID(containerID) .build(); try { @@ -232,6 +238,7 @@ public List queryNode(HddsProtos.NodeState Preconditions.checkNotNull(nodeStatuses); NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder() .setState(nodeStatuses) + .setTraceID(TracingUtil.exportCurrentSpan()) .setScope(queryScope).setPoolName(poolName).build(); try { NodeQueryResponseProto response = @@ -259,6 +266,7 @@ public void notifyObjectStageChange( "Object id cannot be negative."); ObjectStageChangeRequestProto request = ObjectStageChangeRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) .setType(type) .setId(id) .setOp(op) @@ -284,6 +292,7 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType replicationType, HddsProtos.ReplicationFactor factor, HddsProtos .NodePool nodePool) throws IOException { PipelineRequestProto request = PipelineRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) .setNodePool(nodePool) .setReplicationFactor(factor) .setReplicationType(replicationType) @@ -311,7 +320,8 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType public List listPipelines() throws IOException { try { ListPipelineRequestProto request = ListPipelineRequestProto - .newBuilder().build(); + .newBuilder().setTraceID(TracingUtil.exportCurrentSpan()) + .build(); ListPipelineResponseProto response = rpcProxy.listPipelines( NULL_RPC_CONTROLLER, request); List list = new ArrayList<>(); @@ -331,7 +341,8 @@ public void closePipeline(HddsProtos.PipelineID pipelineID) try { ClosePipelineRequestProto request = ClosePipelineRequestProto.newBuilder() - .setPipelineID(pipelineID) + .setTraceID(TracingUtil.exportCurrentSpan()) + .setPipelineID(pipelineID) .build(); rpcProxy.closePipeline(NULL_RPC_CONTROLLER, request); } catch (ServiceException e) { @@ -342,7 +353,9 @@ public void closePipeline(HddsProtos.PipelineID pipelineID) @Override public ScmInfo getScmInfo() throws IOException { HddsProtos.GetScmInfoRequestProto request = - HddsProtos.GetScmInfoRequestProto.getDefaultInstance(); + HddsProtos.GetScmInfoRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) + .build(); try { HddsProtos.GetScmInfoRespsonseProto resp = rpcProxy.getScmInfo( NULL_RPC_CONTROLLER, request); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/GrpcClientInterceptor.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/GrpcClientInterceptor.java new file mode 100644 index 00000000000..58270baabcc --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/GrpcClientInterceptor.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.tracing; + +import org.apache.ratis.thirdparty.io.grpc.CallOptions; +import org.apache.ratis.thirdparty.io.grpc.Channel; +import org.apache.ratis.thirdparty.io.grpc.ClientCall; +import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor; +import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import org.apache.ratis.thirdparty.io.grpc.Metadata; +import org.apache.ratis.thirdparty.io.grpc.Metadata.Key; +import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor; + +/** + * Interceptor to add the tracing id to the outgoing call header. + */ +public class GrpcClientInterceptor implements ClientInterceptor { + + public static final Key TRACING_HEADER = + Key.of("Tracing", Metadata.ASCII_STRING_MARSHALLER); + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, + Channel next) { + + return new SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + + @Override + public void start(Listener responseListener, Metadata headers) { + + Metadata tracingHeaders = new Metadata(); + tracingHeaders.put(TRACING_HEADER, TracingUtil.exportCurrentSpan()); + + headers.merge(tracingHeaders); + + super.start(responseListener, headers); + } + }; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/GrpcServerInterceptor.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/GrpcServerInterceptor.java new file mode 100644 index 00000000000..b63af12b3fa --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/GrpcServerInterceptor.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.tracing; + +import io.opentracing.Scope; +import org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; +import org.apache.ratis.thirdparty.io.grpc.Metadata; +import org.apache.ratis.thirdparty.io.grpc.ServerCall; +import org.apache.ratis.thirdparty.io.grpc.ServerCall.Listener; +import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler; +import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor; + +/** + * Interceptor to add the tracing id to the outgoing call header. + */ +public class GrpcServerInterceptor implements ServerInterceptor { + + @Override + public Listener interceptCall( + ServerCall call, Metadata headers, + ServerCallHandler next) { + + return new SimpleForwardingServerCallListener( + next.startCall(call, headers)) { + @Override + public void onMessage(ReqT message) { + try (Scope scope = TracingUtil + .importAndCreateScope( + call.getMethodDescriptor().getFullMethodName(), + headers.get(GrpcClientInterceptor.TRACING_HEADER))) { + super.onMessage(message); + } + } + }; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/StringCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/StringCodec.java new file mode 100644 index 00000000000..ea88a7f0cdd --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/StringCodec.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.tracing; + +import java.math.BigInteger; + +import io.jaegertracing.internal.JaegerSpanContext; +import io.jaegertracing.internal.exceptions.EmptyTracerStateStringException; +import io.jaegertracing.internal.exceptions.MalformedTracerStateStringException; +import io.jaegertracing.internal.exceptions.TraceIdOutOfBoundException; +import io.jaegertracing.spi.Codec; +import io.opentracing.propagation.Format; + +/** + * A jaeger codec to save the current tracing context t a string. + */ +public class StringCodec implements Codec { + + public static final StringFormat FORMAT = new StringFormat(); + + @Override + public JaegerSpanContext extract(StringBuilder s) { + String value = s.toString(); + if (value != null && !value.equals("")) { + String[] parts = value.split(":"); + if (parts.length != 4) { + throw new MalformedTracerStateStringException(value); + } else { + String traceId = parts[0]; + if (traceId.length() <= 32 && traceId.length() >= 1) { + return new JaegerSpanContext(high(traceId), + (new BigInteger(traceId, 16)).longValue(), + (new BigInteger(parts[1], 16)).longValue(), + (new BigInteger(parts[2], 16)).longValue(), + (new BigInteger(parts[3], 16)).byteValue()); + } else { + throw new TraceIdOutOfBoundException( + "Trace id [" + traceId + "] length is not withing 1 and 32"); + } + } + } else { + throw new EmptyTracerStateStringException(); + } + } + + @Override + public void inject(JaegerSpanContext context, + StringBuilder string) { + int intFlag = context.getFlags() & 255; + string.append( + context.getTraceId() + ":" + Long.toHexString(context.getSpanId()) + + ":" + Long.toHexString(context.getParentId()) + ":" + Integer + .toHexString(intFlag)); + } + + private static long high(String hexString) { + if (hexString.length() > 16) { + int highLength = hexString.length() - 16; + String highString = hexString.substring(0, highLength); + return (new BigInteger(highString, 16)).longValue(); + } else { + return 0L; + } + } + + /** + * The format to save the context as text. + *

+ * Using the mutable StringBuilder instead of plain String. + */ + public static final class StringFormat implements Format { + } + +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TraceAllMethod.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TraceAllMethod.java new file mode 100644 index 00000000000..8bdf638acfc --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TraceAllMethod.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.tracing; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import io.opentracing.Scope; +import io.opentracing.util.GlobalTracer; + +/** + * A Java proxy invocation handler to trace all the methods of the delegate + * class. + * + * @param + */ +public class TraceAllMethod implements InvocationHandler { + + /** + * Cache for all the method objects of the delegate class. + */ + private final Map[], Method>> methods = new HashMap<>(); + + private T delegate; + + private String name; + + public TraceAllMethod(T delegate, String name) { + this.delegate = delegate; + this.name = name; + for (Method method : delegate.getClass().getDeclaredMethods()) { + if (!methods.containsKey(method.getName())) { + methods.put(method.getName(), new HashMap<>()); + } + methods.get(method.getName()).put(method.getParameterTypes(), method); + } + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + Method delegateMethod = findDelegatedMethod(method); + try (Scope scope = GlobalTracer.get().buildSpan( + name + "." + method.getName()) + .startActive(true)) { + try { + return delegateMethod.invoke(delegate, args); + } catch (Exception ex) { + if (ex.getCause() != null) { + throw ex.getCause(); + } else { + throw ex; + } + } + } + } + + private Method findDelegatedMethod(Method method) { + for (Entry[], Method> entry : methods.get(method.getName()) + .entrySet()) { + if (Arrays.equals(entry.getKey(), method.getParameterTypes())) { + return entry.getValue(); + } + } + return null; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java new file mode 100644 index 00000000000..fd1ca959bf9 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.tracing; + +import java.lang.reflect.Proxy; + +import io.jaegertracing.Configuration; +import io.jaegertracing.internal.JaegerTracer; +import io.opentracing.Scope; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; + +/** + * Utility class to collect all the tracing helper methods. + */ +public final class TracingUtil { + + private TracingUtil() { + } + + /** + * Initialize the tracing with the given service name. + * + * @param serviceName + */ + public static void initTracing(String serviceName) { + if (!GlobalTracer.isRegistered()) { + Configuration config = Configuration.fromEnv(serviceName); + JaegerTracer tracer = config.getTracerBuilder() + .registerExtractor(StringCodec.FORMAT, new StringCodec()) + .registerInjector(StringCodec.FORMAT, new StringCodec()) + .build(); + GlobalTracer.register(tracer); + } + } + + /** + * Export the active tracing span as a string. + * + * @return encoded tracing context. + */ + public static String exportCurrentSpan() { + StringBuilder builder = new StringBuilder(); + if (GlobalTracer.get().activeSpan() != null) { + GlobalTracer.get().inject(GlobalTracer.get().activeSpan().context(), + StringCodec.FORMAT, builder); + } + return builder.toString(); + } + + /** + * Create a new scope and use the imported span as the parent. + * + * @param name name of the newly created scope + * @param encodedParent Encoded parent span (could be null or empty) + * + * @return OpenTracing scope. + */ + public static Scope importAndCreateScope(String name, String encodedParent) { + Tracer.SpanBuilder spanBuilder; + Tracer tracer = GlobalTracer.get(); + SpanContext parentSpan = null; + if (encodedParent != null && encodedParent.length() > 0) { + StringBuilder builder = new StringBuilder(); + builder.append(encodedParent); + parentSpan = tracer.extract(StringCodec.FORMAT, builder); + + } + + if (parentSpan == null) { + spanBuilder = tracer.buildSpan(name); + } else { + spanBuilder = + tracer.buildSpan(name).asChildOf(parentSpan); + } + return spanBuilder.startActive(true); + } + + /** + * Creates a proxy of the implementation and trace all the method calls. + * + * @param delegate the original class instance + * @param interfce the interface which should be implemented by the proxy + * @param the type of the interface + * + * @return A new interface which implements interfce but delegate all the + * calls to the delegate and also enables tracing. + */ + public static T createProxy(T delegate, Class interfce) { + Class aClass = delegate.getClass(); + return (T) Proxy.newProxyInstance(aClass.getClassLoader(), + new Class[] {interfce}, + new TraceAllMethod(delegate, interfce.getSimpleName())); + } + +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/package-info.java new file mode 100644 index 00000000000..3ead03b6f6f --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.tracing; + +/** + * Helper classes to use distributed tracing in Ozone components. + */ \ No newline at end of file diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index 1630875b7e4..0b681c56180 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -20,6 +20,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import io.opentracing.Scope; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.InChillModeRequestProto; @@ -72,6 +74,7 @@ .StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; +import org.apache.hadoop.hdds.tracing.TracingUtil; import java.io.IOException; import java.util.List; @@ -100,7 +103,8 @@ public StorageContainerLocationProtocolServerSideTranslatorPB( @Override public ContainerResponseProto allocateContainer(RpcController unused, ContainerRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("allocateContainer", request.getTraceID())) { ContainerWithPipeline containerWithPipeline = impl .allocateContainer(request.getReplicationType(), request.getReplicationFactor(), request.getOwner()); @@ -118,7 +122,8 @@ public ContainerResponseProto allocateContainer(RpcController unused, public GetContainerResponseProto getContainer( RpcController controller, GetContainerRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("getContainer", request.getTraceID())) { ContainerInfo container = impl.getContainer(request.getContainerID()); return GetContainerResponseProto.newBuilder() .setContainerInfo(container.getProtobuf()) @@ -132,7 +137,9 @@ public GetContainerResponseProto getContainer( public GetContainerWithPipelineResponseProto getContainerWithPipeline( RpcController controller, GetContainerWithPipelineRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("getContainerWithPipeline", + request.getTraceID())) { ContainerWithPipeline container = impl .getContainerWithPipeline(request.getContainerID()); return GetContainerWithPipelineResponseProto.newBuilder() @@ -146,7 +153,8 @@ public GetContainerWithPipelineResponseProto getContainerWithPipeline( @Override public SCMListContainerResponseProto listContainer(RpcController controller, SCMListContainerRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("listContainer", request.getTraceID())) { long startContainerID = 0; int count = -1; @@ -173,7 +181,8 @@ public SCMListContainerResponseProto listContainer(RpcController controller, public SCMDeleteContainerResponseProto deleteContainer( RpcController controller, SCMDeleteContainerRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("deleteContainer", request.getTraceID())) { impl.deleteContainer(request.getContainerID()); return SCMDeleteContainerResponseProto.newBuilder().build(); } catch (IOException e) { @@ -186,7 +195,8 @@ public SCMDeleteContainerResponseProto deleteContainer( queryNode(RpcController controller, StorageContainerLocationProtocolProtos.NodeQueryRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("queryNode", request.getTraceID())) { HddsProtos.NodeState nodeState = request.getState(); List datanodes = impl.queryNode(nodeState, request.getScope(), request.getPoolName()); @@ -203,7 +213,9 @@ public SCMDeleteContainerResponseProto deleteContainer( public ObjectStageChangeResponseProto notifyObjectStageChange( RpcController controller, ObjectStageChangeRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("notifyObjectStageChange", + request.getTraceID())) { impl.notifyObjectStageChange(request.getType(), request.getId(), request.getOp(), request.getStage()); return ObjectStageChangeResponseProto.newBuilder().build(); @@ -224,7 +236,8 @@ public PipelineResponseProto allocatePipeline( public ListPipelineResponseProto listPipelines( RpcController controller, ListPipelineRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("listPipelines", request.getTraceID())) { ListPipelineResponseProto.Builder builder = ListPipelineResponseProto .newBuilder(); List pipelines = impl.listPipelines(); @@ -242,7 +255,8 @@ public ListPipelineResponseProto listPipelines( public ClosePipelineResponseProto closePipeline( RpcController controller, ClosePipelineRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("closePipeline", request.getTraceID())) { impl.closePipeline(request.getPipelineID()); return ClosePipelineResponseProto.newBuilder().build(); } catch (IOException e) { @@ -254,7 +268,8 @@ public ClosePipelineResponseProto closePipeline( public HddsProtos.GetScmInfoRespsonseProto getScmInfo( RpcController controller, HddsProtos.GetScmInfoRequestProto req) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("getScmInfo", req.getTraceID())) { ScmInfo scmInfo = impl.getScmInfo(); return HddsProtos.GetScmInfoRespsonseProto.newBuilder() .setClusterId(scmInfo.getClusterId()) @@ -270,7 +285,8 @@ public HddsProtos.GetScmInfoRespsonseProto getScmInfo( public InChillModeResponseProto inChillMode( RpcController controller, InChillModeRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("inChillMode", request.getTraceID())) { return InChillModeResponseProto.newBuilder() .setInChillMode(impl.inChillMode()).build(); } catch (IOException ex) { @@ -282,7 +298,8 @@ public InChillModeResponseProto inChillMode( public ForceExitChillModeResponseProto forceExitChillMode( RpcController controller, ForceExitChillModeRequestProto request) throws ServiceException { - try { + try (Scope scope = TracingUtil + .importAndCreateScope("forceExitChillMode", request.getTraceID())) { return ForceExitChillModeResponseProto.newBuilder() .setExitedChillMode(impl.forceExitChillMode()).build(); } catch (IOException ex) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java index 31b9beed680..3713d7af58c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java @@ -18,32 +18,6 @@ package org.apache.ratis; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.security.x509.SecurityConfig; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.grpc.GrpcFactory; -import org.apache.ratis.grpc.GrpcTlsConfig; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.retry.RetryPolicies; -import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.rpc.RpcType; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.proto.RaftProtos; -import org.apache.ratis.util.SizeInBytes; -import org.apache.ratis.util.TimeDuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -54,6 +28,33 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcFactory; +import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Ratis helper methods. */ @@ -165,6 +166,7 @@ static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader, SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)); GrpcConfigKeys.OutputStream.setOutstandingAppendsMax(properties, maxOutStandingRequest); + RaftClient.Builder builder = RaftClient.newBuilder() .setRaftGroup(group) .setLeaderId(leader) diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto index fe34fc0e38d..d0f6c13cd4d 100644 --- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto @@ -38,7 +38,7 @@ message ContainerRequestProto { required ReplicationFactor replicationFactor = 2; required ReplicationType replicationType = 3; required string owner = 4; - + optional string traceID = 5; } /** @@ -57,6 +57,8 @@ message ContainerResponseProto { message GetContainerRequestProto { required int64 containerID = 1; + optional string traceID = 2; + } message GetContainerResponseProto { @@ -65,6 +67,8 @@ message GetContainerResponseProto { message GetContainerWithPipelineRequestProto { required int64 containerID = 1; + optional string traceID = 2; + } message GetContainerWithPipelineResponseProto { @@ -74,7 +78,8 @@ message GetContainerWithPipelineResponseProto { message SCMListContainerRequestProto { required uint32 count = 1; optional uint64 startContainerID = 2; - } + optional string traceID = 3; +} message SCMListContainerResponseProto { repeated ContainerInfoProto containers = 1; @@ -82,6 +87,8 @@ message SCMListContainerResponseProto { message SCMDeleteContainerRequestProto { required int64 containerID = 1; + optional string traceID = 2; + } message SCMDeleteContainerResponseProto { @@ -106,6 +113,7 @@ message ObjectStageChangeRequestProto { required Type type = 2; required Op op= 3; required Stage stage = 4; + optional string traceID = 5; } message ObjectStageChangeResponseProto { @@ -120,6 +128,7 @@ message NodeQueryRequestProto { required NodeState state = 1; required QueryScope scope = 2; optional string poolName = 3; // if scope is pool, then pool name is needed. + optional string traceID = 4; } message NodeQueryResponseProto { @@ -137,6 +146,7 @@ message PipelineRequestProto { // datanodes. optional NodePool nodePool = 3; optional string pipelineID = 4; + optional string traceID = 5; } message PipelineResponseProto { @@ -150,6 +160,7 @@ message PipelineResponseProto { } message ListPipelineRequestProto { + optional string traceID = 1; } message ListPipelineResponseProto { @@ -158,12 +169,15 @@ message ListPipelineResponseProto { message ClosePipelineRequestProto { required PipelineID pipelineID = 1; + optional string traceID = 2; + } message ClosePipelineResponseProto { } message InChillModeRequestProto { + optional string traceID = 1; } message InChillModeResponseProto { @@ -171,6 +185,7 @@ message InChillModeResponseProto { } message ForceExitChillModeRequestProto { + optional string traceID = 1; } message ForceExitChillModeResponseProto { diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index ab09669f3c9..cc8b3179e5c 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -165,6 +165,7 @@ message ContainerWithPipeline { } message GetScmInfoRequestProto { + optional string traceID = 1; } message GetScmInfoRespsonseProto { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java index b5140632244..3a92a4adf1d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; @@ -139,6 +140,7 @@ public static void main(String[] args) { */ @Override public void start(Object service) { + DefaultMetricsSystem.initialize("HddsDatanode"); OzoneConfiguration.activate(); if (service instanceof Configurable) { @@ -151,6 +153,9 @@ public void start(Object service) { datanodeDetails = initializeDatanodeDetails(); datanodeDetails.setHostName(hostname); datanodeDetails.setIpAddress(ip); + TracingUtil.initTracing( + "HddsDatanodeService." + datanodeDetails.getUuidString() + .substring(0, 8)); LOG.info("HddsDatanodeService host:{} ip:{}", hostname, ip); // Authenticate Hdds Datanode service if security is enabled if (conf.getBoolean(OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index b38d99f1b4b..048db83fa68 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.common.helpers. StorageContainerException; +import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -102,11 +103,12 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, ServerCredentialInterceptor credInterceptor = new ServerCredentialInterceptor(getBlockTokenVerifier()); + GrpcServerInterceptor tracingInterceptor = new GrpcServerInterceptor(); nettyServerBuilder.addService(ServerInterceptors.intercept( new GrpcXceiverService(dispatcher, getSecurityConfig().isBlockTokenEnabled(), - getBlockTokenVerifier()), credInterceptor)); - + getBlockTokenVerifier()), credInterceptor, + tracingInterceptor)); for (BindableService service : additionalServices) { nettyServerBuilder.addService(service); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index ea8a15f74a4..127f15bead9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -23,6 +23,8 @@ import com.google.common.cache.CacheBuilder; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; + +import io.opentracing.Scope; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; @@ -44,6 +46,7 @@ .ReadChunkRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ReadChunkResponseProto; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.security.UserGroupInformation; @@ -225,58 +228,61 @@ public TransactionContext startTransaction(RaftClientRequest request) final ContainerCommandRequestProto proto = getRequestProto(request.getMessage().getContent()); Preconditions.checkArgument(request.getRaftGroupId().equals(gid)); - try { - dispatcher.validateContainerCommand(proto); - } catch (IOException ioe) { - TransactionContext ctxt = TransactionContext.newBuilder() - .setClientRequest(request) - .setStateMachine(this) - .setServerRole(RaftPeerRole.LEADER) - .build(); - ctxt.setException(ioe); - return ctxt; - } - if (proto.getCmdType() == Type.WriteChunk) { - final WriteChunkRequestProto write = proto.getWriteChunk(); - // create the state machine data proto - final WriteChunkRequestProto dataWriteChunkProto = - WriteChunkRequestProto - .newBuilder(write) - .build(); - ContainerCommandRequestProto dataContainerCommandProto = - ContainerCommandRequestProto - .newBuilder(proto) - .setWriteChunk(dataWriteChunkProto) - .build(); + try (Scope scope = TracingUtil + .importAndCreateScope(proto.getCmdType().name(), proto.getTraceID())) { + try { + dispatcher.validateContainerCommand(proto); + } catch (IOException ioe) { + TransactionContext ctxt = TransactionContext.newBuilder() + .setClientRequest(request) + .setStateMachine(this) + .setServerRole(RaftPeerRole.LEADER) + .build(); + ctxt.setException(ioe); + return ctxt; + } + if (proto.getCmdType() == Type.WriteChunk) { + final WriteChunkRequestProto write = proto.getWriteChunk(); + // create the state machine data proto + final WriteChunkRequestProto dataWriteChunkProto = + WriteChunkRequestProto + .newBuilder(write) + .build(); + ContainerCommandRequestProto dataContainerCommandProto = + ContainerCommandRequestProto + .newBuilder(proto) + .setWriteChunk(dataWriteChunkProto) + .build(); - // create the log entry proto - final WriteChunkRequestProto commitWriteChunkProto = - WriteChunkRequestProto.newBuilder() - .setBlockID(write.getBlockID()) - .setChunkData(write.getChunkData()) - // skipping the data field as it is - // already set in statemachine data proto - .build(); - ContainerCommandRequestProto commitContainerCommandProto = - ContainerCommandRequestProto - .newBuilder(proto) - .setWriteChunk(commitWriteChunkProto) - .build(); + // create the log entry proto + final WriteChunkRequestProto commitWriteChunkProto = + WriteChunkRequestProto.newBuilder() + .setBlockID(write.getBlockID()) + .setChunkData(write.getChunkData()) + // skipping the data field as it is + // already set in statemachine data proto + .build(); + ContainerCommandRequestProto commitContainerCommandProto = + ContainerCommandRequestProto + .newBuilder(proto) + .setWriteChunk(commitWriteChunkProto) + .build(); - return TransactionContext.newBuilder() - .setClientRequest(request) - .setStateMachine(this) - .setServerRole(RaftPeerRole.LEADER) - .setStateMachineData(dataContainerCommandProto.toByteString()) - .setLogData(commitContainerCommandProto.toByteString()) - .build(); - } else { - return TransactionContext.newBuilder() - .setClientRequest(request) - .setStateMachine(this) - .setServerRole(RaftPeerRole.LEADER) - .setLogData(request.getMessage().getContent()) - .build(); + return TransactionContext.newBuilder() + .setClientRequest(request) + .setStateMachine(this) + .setServerRole(RaftPeerRole.LEADER) + .setStateMachineData(dataContainerCommandProto.toByteString()) + .setLogData(commitContainerCommandProto.toByteString()) + .build(); + } else { + return TransactionContext.newBuilder() + .setClientRequest(request) + .setStateMachine(this) + .setServerRole(RaftPeerRole.LEADER) + .setLogData(request.getMessage().getContent()) + .build(); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 18253197bf2..c89d4f5ab50 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -34,11 +34,14 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; + +import io.opentracing.Scope; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClientConfigKeys; @@ -142,6 +145,7 @@ private XceiverServerRatis(DatanodeDetails dd, int port, for (int i = 0; i < numContainerOpExecutors; i++) { executors.add(Executors.newSingleThreadExecutor()); } + RaftServer.Builder builder = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(dd)) .setProperties(serverProperties) @@ -481,15 +485,20 @@ public void submitRequest(ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException { super.submitRequest(request, pipelineID); RaftClientReply reply; - RaftClientRequest raftClientRequest = - createRaftClientRequest(request, pipelineID, - RaftClientRequest.writeRequestType()); - try { - reply = server.submitClientRequestAsync(raftClientRequest).get(); - } catch (Exception e) { - throw new IOException(e.getMessage(), e); + try (Scope scope = TracingUtil + .importAndCreateScope(request.getCmdType().name(), + request.getTraceID())) { + + RaftClientRequest raftClientRequest = + createRaftClientRequest(request, pipelineID, + RaftClientRequest.writeRequestType()); + try { + reply = server.submitClientRequestAsync(raftClientRequest).get(); + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } + processReply(reply); } - processReply(reply); } private RaftClientRequest createRaftClientRequest( diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml index 35cfd0b0751..1d39476a4d3 100644 --- a/hadoop-hdds/pom.xml +++ b/hadoop-hdds/pom.xml @@ -203,6 +203,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> org.apache.hadoop hadoop-hdfs-client + + + com.squareup.okhttp + okhttp + + org.apache.hadoop diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 2fdc245faa5..bc81c84b345 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; @@ -453,6 +454,7 @@ public static void main(String[] argv) throws IOException { System.exit(0); } try { + TracingUtil.initTracing("StorageContainerManager"); OzoneConfiguration conf = new OzoneConfiguration(); GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); if (!hParser.isParseSuccessful()) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java index 290a31cd3fa..8370bbc0650 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; @@ -60,7 +61,7 @@ public class ObjectStore { * @param proxy ClientProtocol proxy. */ public ObjectStore(Configuration conf, ClientProtocol proxy) { - this.proxy = proxy; + this.proxy = TracingUtil.createProxy(proxy, ClientProtocol.class); this.listCacheSize = HddsClientUtils.getListCacheSize(conf); } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 63bffed4a21..6eefcc042d4 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -17,15 +17,18 @@ */ package org.apache.hadoop.ozone.om.protocolPB; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -42,163 +45,95 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; -import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.AllocateBlockRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.AllocateBlockResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CreateKeyRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CreateKeyResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CommitKeyRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CommitKeyResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.DeleteKeyRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.DeleteKeyResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.BucketArgs; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.BucketInfo; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CreateBucketRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CreateBucketResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.InfoBucketRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.InfoBucketResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.SetBucketPropertyRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.SetBucketPropertyResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.DeleteBucketRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.DeleteBucketResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CreateVolumeRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CreateVolumeResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.LookupKeyRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.LookupKeyResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .MultipartCommitUploadPartRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .MultipartCommitUploadPartResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .MultipartInfoInitiateRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .MultipartInfoInitiateResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos. - MultipartUploadAbortRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .MultipartUploadAbortResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .MultipartUploadCompleteRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .MultipartUploadCompleteResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .MultipartUploadListPartsRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .MultipartUploadListPartsResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.RenameKeyRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.RenameKeyResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.KeyArgs; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.SetVolumePropertyRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.SetVolumePropertyResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.DeleteVolumeRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.DeleteVolumeResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.InfoVolumeRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.InfoVolumeResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CheckVolumeAccessRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CheckVolumeAccessResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.ListBucketsRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.ListBucketsResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.ListKeysRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.ListKeysResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.VolumeInfo; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.Status; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.OzoneAclInfo; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.ListVolumeRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.ListVolumeResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.ServiceListRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.ServiceListResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.S3CreateBucketRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.S3CreateBucketResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.S3DeleteBucketRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.S3DeleteBucketResponse; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.S3BucketInfoRequest; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.S3BucketInfoResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .S3ListBucketsRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .S3ListBucketsResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .GetS3SecretRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .GetS3SecretResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateBucketResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteBucketResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetDelegationTokenResponseProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3SecretRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3SecretResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoVolumeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListBucketsRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListBucketsResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListKeysRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListKeysResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartCommitUploadPartRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartCommitUploadPartResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadAbortRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadAbortResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadListPartsRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadListPartsResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenewDelegationTokenResponseProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3BucketInfoRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3BucketInfoResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3CreateBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3CreateBucketResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3DeleteBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3DeleteBucketResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3ListBucketsRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3ListBucketsResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetBucketPropertyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetBucketPropertyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolumePropertyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolumePropertyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocolPB.OMPBHelper; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; -import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.GetDelegationTokenResponseProto; -import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto; +import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; +import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.security.token.Token; -import static org.apache.hadoop.ozone.om.exceptions.OMException.*; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import io.opentracing.Scope; +import io.opentracing.util.GlobalTracer; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.UNKNOWN; -import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK; + /** * The client side implementation of OzoneManagerProtocol. @@ -259,6 +194,7 @@ public Object getUnderlyingProxyObject() { * @param cmdType type of the request */ private OMRequest.Builder createOMRequest(Type cmdType) { + return OMRequest.newBuilder() .setCmdType(cmdType) .setClientId(clientID); @@ -272,10 +208,18 @@ private OMRequest.Builder createOMRequest(Type cmdType) { */ private OMResponse submitRequest(OMRequest omRequest) throws IOException { + Scope scope = + GlobalTracer.get().buildSpan(omRequest.getCmdType().name()) + .startActive(true); try { - return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, omRequest); + OMRequest payload = OMRequest.newBuilder(omRequest) + .setTraceID(TracingUtil.exportCurrentSpan()) + .build(); + return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); + } finally { + scope.close(); } } diff --git a/hadoop-ozone/dist/src/main/compose/ozonetrace/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozonetrace/docker-compose.yaml new file mode 100644 index 00000000000..5fadd72223b --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozonetrace/docker-compose.yaml @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "3" +services: + jaeger: + image: jaegertracing/all-in-one:latest + environment: + COLLECTOR_ZIPKIN_HTTP_PORT: 9411 + ports: + - 16686:16686 + datanode: + image: apache/hadoop-runner + volumes: + - ../..:/opt/hadoop + ports: + - 9864 + command: ["ozone","datanode"] + env_file: + - ./docker-config + ozoneManager: + image: apache/hadoop-runner + volumes: + - ../..:/opt/hadoop + ports: + - 9874:9874 + environment: + ENSURE_OM_INITIALIZED: /data/metadata/ozoneManager/current/VERSION + WAITFOR: scm:9876 + env_file: + - ./docker-config + command: ["ozone","om"] + scm: + image: apache/hadoop-runner + volumes: + - ../..:/opt/hadoop + ports: + - 9876:9876 + env_file: + - ./docker-config + environment: + ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION + command: ["ozone","scm"] + s3g: + image: apache/hadoop-runner + volumes: + - ../..:/opt/hadoop + ports: + - 9878:9878 + env_file: + - ./docker-config + command: ["ozone","s3g"] diff --git a/hadoop-ozone/dist/src/main/compose/ozonetrace/docker-config b/hadoop-ozone/dist/src/main/compose/ozonetrace/docker-config new file mode 100644 index 00000000000..39a55dcf6b1 --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozonetrace/docker-config @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +OZONE-SITE.XML_ozone.om.address=ozoneManager +OZONE-SITE.XML_ozone.scm.names=scm +OZONE-SITE.XML_ozone.enabled=true +OZONE-SITE.XML_ozone.scm.datanode.id=/data/datanode.id +OZONE-SITE.XML_ozone.scm.block.client.address=scm +OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata +OZONE-SITE.XML_ozone.scm.client.address=scm +OZONE-SITE.XML_ozone.replication=1 +OZONE-SITE.XML_hdds.datanode.dir=/data/hdds + +HDFS-SITE.XML_rpc.metrics.quantile.enable=true +HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300 + +JAEGER_SAMPLER_PARAM=1 +JAEGER_SAMPLER_TYPE=const +JAEGER_AGENT_HOST=jaeger + +LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout +LOG4J.PROPERTIES_log4j.appender.stdout=org.apache.log4j.ConsoleAppender +LOG4J.PROPERTIES_log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +LOG4J.PROPERTIES_log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n +LOG4J.PROPERTIES_log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR +LOG4J.PROPERTIES_log4j.logger.org.apache.ratis.conf.ConfUtils=WARN +LOG4J.PROPERTIES_log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR +LOG4J.PROPERTIES_log4j.logger.http.requests.s3gateway=INFO,s3gatewayrequestlog +LOG4J.PROPERTIES_log4j.appender.s3gatewayrequestlog=org.apache.hadoop.http.HttpRequestLogAppender +LOG4J.PROPERTIES_log4j.appender.s3gatewayrequestlog.Filename=/tmp/jetty-s3gateway-yyyy_mm_dd.log +LOG4J.PROPERTIES_log4j.appender.s3gatewayrequestlog.RetainDays=3 + +#Enable this variable to print out all hadoop rpc traffic to the stdout. See http://byteman.jboss.org/ to define your own instrumentation. +#BYTEMAN_SCRIPT_URL=https://raw.githubusercontent.com/apache/hadoop/trunk/dev-support/byteman/hadooprpc.btm + +#LOG4J2.PROPERTIES_* are for Ozone Audit Logging +LOG4J2.PROPERTIES_monitorInterval=30 +LOG4J2.PROPERTIES_filter=read,write +LOG4J2.PROPERTIES_filter.read.type=MarkerFilter +LOG4J2.PROPERTIES_filter.read.marker=READ +LOG4J2.PROPERTIES_filter.read.onMatch=DENY +LOG4J2.PROPERTIES_filter.read.onMismatch=NEUTRAL +LOG4J2.PROPERTIES_filter.write.type=MarkerFilter +LOG4J2.PROPERTIES_filter.write.marker=WRITE +LOG4J2.PROPERTIES_filter.write.onMatch=NEUTRAL +LOG4J2.PROPERTIES_filter.write.onMismatch=NEUTRAL +LOG4J2.PROPERTIES_appenders=console, rolling +LOG4J2.PROPERTIES_appender.console.type=Console +LOG4J2.PROPERTIES_appender.console.name=STDOUT +LOG4J2.PROPERTIES_appender.console.layout.type=PatternLayout +LOG4J2.PROPERTIES_appender.console.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n +LOG4J2.PROPERTIES_appender.rolling.type=RollingFile +LOG4J2.PROPERTIES_appender.rolling.name=RollingFile +LOG4J2.PROPERTIES_appender.rolling.fileName =${sys:hadoop.log.dir}/om-audit-${hostName}.log +LOG4J2.PROPERTIES_appender.rolling.filePattern=${sys:hadoop.log.dir}/om-audit-${hostName}-%d{yyyy-MM-dd-HH-mm-ss}-%i.log.gz +LOG4J2.PROPERTIES_appender.rolling.layout.type=PatternLayout +LOG4J2.PROPERTIES_appender.rolling.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n +LOG4J2.PROPERTIES_appender.rolling.policies.type=Policies +LOG4J2.PROPERTIES_appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +LOG4J2.PROPERTIES_appender.rolling.policies.time.interval=86400 +LOG4J2.PROPERTIES_appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +LOG4J2.PROPERTIES_appender.rolling.policies.size.size=64MB +LOG4J2.PROPERTIES_loggers=audit +LOG4J2.PROPERTIES_logger.audit.type=AsyncLogger +LOG4J2.PROPERTIES_logger.audit.name=OMAudit +LOG4J2.PROPERTIES_logger.audit.level=INFO +LOG4J2.PROPERTIES_logger.audit.appenderRefs=rolling +LOG4J2.PROPERTIES_logger.audit.appenderRef.file.ref=RollingFile +LOG4J2.PROPERTIES_rootLogger.level=INFO +LOG4J2.PROPERTIES_rootLogger.appenderRefs=stdout +LOG4J2.PROPERTIES_rootLogger.appenderRef.stdout.ref=STDOUT diff --git a/hadoop-ozone/objectstore-service/pom.xml b/hadoop-ozone/objectstore-service/pom.xml index 474a7dfda00..edb1903d13d 100644 --- a/hadoop-ozone/objectstore-service/pom.xml +++ b/hadoop-ozone/objectstore-service/pom.xml @@ -56,6 +56,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> org.apache.hadoop hadoop-hdfs-client provided + + + com.squareup.okhttp + okhttp + + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index a01ad0c42f4..5b0ff7674ba 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; @@ -586,6 +587,7 @@ public static void main(String[] argv) throws IOException { System.exit(0); } try { + TracingUtil.initTracing("OzoneManager"); OzoneConfiguration conf = new OzoneConfiguration(); GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); if (!hParser.isParseSuccessful()) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 9522d76f900..5684fa561b3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -16,17 +16,16 @@ */ package org.apache.hadoop.ozone.protocolPB; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import io.opentracing.Scope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,10 +63,17 @@ public OzoneManagerProtocolServerSideTranslatorPB( @Override public OMResponse submitRequest(RpcController controller, OMRequest request) throws ServiceException { - if (isRatisEnabled) { - return submitRequestToRatis(request); - } else { - return submitRequestDirectlyToOM(request); + Scope scope = TracingUtil + .importAndCreateScope(request.getCmdType().name(), + request.getTraceID()); + try { + if (isRatisEnabled) { + return submitRequestToRatis(request); + } else { + return submitRequestDirectlyToOM(request); + } + } finally { + scope.close(); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/Shell.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/Shell.java index 4fcea633268..f16eb8ce0ad 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/Shell.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/Shell.java @@ -20,12 +20,15 @@ import org.apache.hadoop.hdds.cli.GenericCli; import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.web.ozShell.bucket.BucketCommands; import org.apache.hadoop.ozone.web.ozShell.keys.KeyCommands; import org.apache.hadoop.ozone.web.ozShell.s3.S3Commands; -import org.apache.hadoop.ozone.web.ozShell.volume.VolumeCommands; import org.apache.hadoop.ozone.web.ozShell.token.TokenCommands; +import org.apache.hadoop.ozone.web.ozShell.volume.VolumeCommands; +import io.opentracing.Scope; +import io.opentracing.util.GlobalTracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine.Command; @@ -73,6 +76,14 @@ public class Shell extends GenericCli { // General options public static final int DEFAULT_OZONE_PORT = 50070; + @Override + public void execute(String[] argv) { + TracingUtil.initTracing("shell"); + try (Scope scope = GlobalTracer.get().buildSpan("main").startActive(true)) { + super.execute(argv); + } + } + /** * Main for the ozShell Command handling. * diff --git a/hadoop-ozone/ozonefs/pom.xml b/hadoop-ozone/ozonefs/pom.xml index 95a602c1469..7da5efeb7f5 100644 --- a/hadoop-ozone/ozonefs/pom.xml +++ b/hadoop-ozone/ozonefs/pom.xml @@ -95,11 +95,6 @@ hadoop-hdfs compile - - org.apache.hadoop - hadoop-hdfs-client - compile - org.apache.hadoop hadoop-hdds-common diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 12250e78cb9..a3f761af236 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -187,10 +187,6 @@ org.apache.hadoop hadoop-hdfs - - org.apache.hadoop - hadoop-hdfs-client - org.apache.hadoop hadoop-hdds-common diff --git a/hadoop-ozone/tools/pom.xml b/hadoop-ozone/tools/pom.xml index 2d273d1349a..1af72c9ece4 100644 --- a/hadoop-ozone/tools/pom.xml +++ b/hadoop-ozone/tools/pom.xml @@ -51,11 +51,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> hadoop-hdfs compile - - org.apache.hadoop - hadoop-hdfs-client - compile - io.dropwizard.metrics metrics-core diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index f9e8c9b97ce..694373d86d5 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -18,6 +18,7 @@ import org.apache.hadoop.hdds.cli.GenericCli; import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.tracing.TracingUtil; import picocli.CommandLine.Command; @@ -32,6 +33,12 @@ mixinStandardHelpOptions = true) public class Freon extends GenericCli { + @Override + public void execute(String[] argv) { + TracingUtil.initTracing("freon"); + super.execute(argv); + } + public static void main(String[] args) { new Freon().run(args); } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index 46ab9eba0e0..67df0f923aa 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import io.opentracing.Scope; +import io.opentracing.util.GlobalTracer; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.client.OzoneQuota; @@ -554,7 +556,8 @@ public void run() { LOG.trace("Creating volume: {}", volumeName); long start = System.nanoTime(); OzoneVolume volume; - try { + try (Scope scope = GlobalTracer.get().buildSpan("createVolume") + .startActive(true)) { objectStore.createVolume(volumeName); long volumeCreationDuration = System.nanoTime() - start; volumeCreationTime.getAndAdd(volumeCreationDuration); @@ -576,12 +579,15 @@ public void run() { LOG.trace("Creating bucket: {} in volume: {}", bucketName, volume.getName()); start = System.nanoTime(); - volume.createBucket(bucketName); - long bucketCreationDuration = System.nanoTime() - start; - histograms.get(FreonOps.BUCKET_CREATE.ordinal()) - .update(bucketCreationDuration); - bucketCreationTime.getAndAdd(bucketCreationDuration); - numberOfBucketsCreated.getAndIncrement(); + try (Scope scope = GlobalTracer.get().buildSpan("createBucket") + .startActive(true)) { + volume.createBucket(bucketName); + long bucketCreationDuration = System.nanoTime() - start; + histograms.get(FreonOps.BUCKET_CREATE.ordinal()) + .update(bucketCreationDuration); + bucketCreationTime.getAndAdd(bucketCreationDuration); + numberOfBucketsCreated.getAndIncrement(); + } OzoneBucket bucket = volume.getBucket(bucketName); for (int k = 0; k < totalKeys; k++) { String key = "key-" + k + "-" + @@ -592,22 +598,28 @@ public void run() { LOG.trace("Adding key: {} in bucket: {} of volume: {}", key, bucket, volume); long keyCreateStart = System.nanoTime(); - OzoneOutputStream os = - bucket.createKey(key, keySize, type, factor, new HashMap<>()); - long keyCreationDuration = System.nanoTime() - keyCreateStart; - histograms.get(FreonOps.KEY_CREATE.ordinal()) - .update(keyCreationDuration); - keyCreationTime.getAndAdd(keyCreationDuration); - long keyWriteStart = System.nanoTime(); - os.write(keyValue); - os.write(randomValue); - os.close(); - long keyWriteDuration = System.nanoTime() - keyWriteStart; - threadKeyWriteTime += keyWriteDuration; - histograms.get(FreonOps.KEY_WRITE.ordinal()) - .update(keyWriteDuration); - totalBytesWritten.getAndAdd(keySize); - numberOfKeysAdded.getAndIncrement(); + try (Scope scope = GlobalTracer.get().buildSpan("createKey") + .startActive(true)) { + OzoneOutputStream os = + bucket + .createKey(key, keySize, type, factor, new HashMap<>()); + long keyCreationDuration = System.nanoTime() - keyCreateStart; + histograms.get(FreonOps.KEY_CREATE.ordinal()) + .update(keyCreationDuration); + keyCreationTime.getAndAdd(keyCreationDuration); + long keyWriteStart = System.nanoTime(); + os.write(keyValue); + os.write(randomValue); + os.close(); + + long keyWriteDuration = System.nanoTime() - keyWriteStart; + + threadKeyWriteTime += keyWriteDuration; + histograms.get(FreonOps.KEY_WRITE.ordinal()) + .update(keyWriteDuration); + totalBytesWritten.getAndAdd(keySize); + numberOfKeysAdded.getAndIncrement(); + } if (validateWrites) { byte[] value = ArrayUtils.addAll(keyValue, randomValue); boolean validate = validationQueue.offer(