diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 98e78258dca..76b1208f528 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -28,7 +28,6 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLE import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -37,6 +36,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; @@ -370,6 +370,15 @@ class AsyncConnectionImpl implements AsyncConnection { RETRY_TIMER); } + private Hbck getHbckInternal(ServerName masterServer) { + Span.current().setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName()); + // we will not create a new connection when creating a new protobuf stub, and for hbck there + // will be no performance consideration, so for simplification we will create a new stub every + // time instead of caching the stub here. + return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( + rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); + } + @Override public CompletableFuture getHbck() { return TraceUtil.tracedFuture(() -> { @@ -378,11 +387,7 @@ class AsyncConnectionImpl implements AsyncConnection { if (error != null) { future.completeExceptionally(error); } else { - try { - future.complete(getHbck(sn)); - } catch (IOException e) { - future.completeExceptionally(e); - } + future.complete(getHbckInternal(sn)); } }); return future; @@ -390,18 +395,14 @@ class AsyncConnectionImpl implements AsyncConnection { } @Override - public Hbck getHbck(ServerName masterServer) throws IOException { - Span span = TraceUtil.createSpan("AsyncConnection.getHbck") - .setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName()); - try (Scope scope = span.makeCurrent()) { - // we will not create a new connection when creating a new protobuf stub, and for hbck there - // will be no performance consideration, so for simplification we will create a new stub every - // time instead of caching the stub here. - return new HBaseHbck( - MasterProtos.HbckService - .newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), - rpcControllerFactory); - } + public Hbck getHbck(ServerName masterServer) { + return TraceUtil.trace(new Supplier() { + + @Override + public Hbck get() { + return getHbckInternal(masterServer); + } + }, "AsyncConnection.getHbck"); } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java new file mode 100644 index 00000000000..fec5f6d1e91 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncConnectionTracing { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncConnectionTracing.class); + + private static Configuration CONF = HBaseConfiguration.create(); + + private ServerName masterServer = + ServerName.valueOf("localhost", 12345, System.currentTimeMillis()); + + private AsyncConnection conn; + + @Rule + public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); + + @Before + public void setUp() throws IOException { + ConnectionRegistry registry = new DoNothingConnectionRegistry(CONF) { + + @Override + public CompletableFuture getActiveMaster() { + return CompletableFuture.completedFuture(masterServer); + } + }; + conn = new AsyncConnectionImpl(CONF, registry, "test", + UserProvider.instantiate(CONF).getCurrent()); + } + + @After + public void tearDown() throws IOException { + Closeables.close(conn, true); + } + + private void assertTrace(String methodName, ServerName serverName) { + Waiter.waitFor(CONF, 1000, + () -> traceRule.getSpans().stream() + .anyMatch(span -> span.getName().equals("AsyncConnection." + methodName) && + span.getKind() == SpanKind.INTERNAL && span.hasEnded())); + SpanData data = traceRule.getSpans().stream() + .filter(s -> s.getName().equals("AsyncConnection." + methodName)).findFirst().get(); + assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); + if (serverName != null) { + assertEquals(serverName.getServerName(), data.getAttributes().get(TraceUtil.SERVER_NAME_KEY)); + } + } + + @Test + public void testHbck() { + conn.getHbck().join(); + assertTrace("getHbck", masterServer); + } + + @Test + public void testHbckWithServerName() throws IOException { + ServerName serverName = ServerName.valueOf("localhost", 23456, System.currentTimeMillis()); + conn.getHbck(serverName); + assertTrace("getHbck", serverName); + } + + @Test + public void testClose() throws IOException { + conn.close(); + assertTrace("close", null); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index 43c2e048686..fb37080a5fb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -189,6 +189,20 @@ public final class TraceUtil { } } + public static T trace(Supplier action, String spanName) { + Span span = createSpan(spanName); + try (Scope scope = span.makeCurrent()) { + T ret = action.get(); + span.setStatus(StatusCode.OK); + return ret; + } catch (Throwable e) { + setError(span, e); + throw e; + } finally { + span.end(); + } + } + @FunctionalInterface public interface IOExceptionCallable { V call() throws IOException;