HBASE-26140 Backport HBASE-25778 "The tracinig implementation for AsyncConnectionImpl.getHbck is incorrect" to branch-2 (#3631)

17/17 commits of HBASE-22120, original commit f36e153964

Co-authored-by: Duo Zhang <zhangduo@apache.org>

Signed-off-by: Peter Somogyi <psomogyi@apache.org>
This commit is contained in:
Tak Lon (Stephen) Wu 2021-08-26 11:29:30 -07:00
parent 65b5b9b2a8
commit ab431fc8a9
3 changed files with 145 additions and 18 deletions

View File

@ -28,7 +28,6 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLE
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -37,6 +36,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
@ -370,6 +370,15 @@ class AsyncConnectionImpl implements AsyncConnection {
RETRY_TIMER);
}
private Hbck getHbckInternal(ServerName masterServer) {
Span.current().setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
// we will not create a new connection when creating a new protobuf stub, and for hbck there
// will be no performance consideration, so for simplification we will create a new stub every
// time instead of caching the stub here.
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
}
@Override
public CompletableFuture<Hbck> getHbck() {
return TraceUtil.tracedFuture(() -> {
@ -378,11 +387,7 @@ class AsyncConnectionImpl implements AsyncConnection {
if (error != null) {
future.completeExceptionally(error);
} else {
try {
future.complete(getHbck(sn));
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(getHbckInternal(sn));
}
});
return future;
@ -390,18 +395,14 @@ class AsyncConnectionImpl implements AsyncConnection {
}
@Override
public Hbck getHbck(ServerName masterServer) throws IOException {
Span span = TraceUtil.createSpan("AsyncConnection.getHbck")
.setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
try (Scope scope = span.makeCurrent()) {
// we will not create a new connection when creating a new protobuf stub, and for hbck there
// will be no performance consideration, so for simplification we will create a new stub every
// time instead of caching the stub here.
return new HBaseHbck(
MasterProtos.HbckService
.newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)),
rpcControllerFactory);
}
public Hbck getHbck(ServerName masterServer) {
return TraceUtil.trace(new Supplier<Hbck>() {
@Override
public Hbck get() {
return getHbckInternal(masterServer);
}
}, "AsyncConnection.getHbck");
}
@Override

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncConnectionTracing {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncConnectionTracing.class);
private static Configuration CONF = HBaseConfiguration.create();
private ServerName masterServer =
ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
private AsyncConnection conn;
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
@Before
public void setUp() throws IOException {
ConnectionRegistry registry = new DoNothingConnectionRegistry(CONF) {
@Override
public CompletableFuture<ServerName> getActiveMaster() {
return CompletableFuture.completedFuture(masterServer);
}
};
conn = new AsyncConnectionImpl(CONF, registry, "test",
UserProvider.instantiate(CONF).getCurrent());
}
@After
public void tearDown() throws IOException {
Closeables.close(conn, true);
}
private void assertTrace(String methodName, ServerName serverName) {
Waiter.waitFor(CONF, 1000,
() -> traceRule.getSpans().stream()
.anyMatch(span -> span.getName().equals("AsyncConnection." + methodName) &&
span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
SpanData data = traceRule.getSpans().stream()
.filter(s -> s.getName().equals("AsyncConnection." + methodName)).findFirst().get();
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
if (serverName != null) {
assertEquals(serverName.getServerName(), data.getAttributes().get(TraceUtil.SERVER_NAME_KEY));
}
}
@Test
public void testHbck() {
conn.getHbck().join();
assertTrace("getHbck", masterServer);
}
@Test
public void testHbckWithServerName() throws IOException {
ServerName serverName = ServerName.valueOf("localhost", 23456, System.currentTimeMillis());
conn.getHbck(serverName);
assertTrace("getHbck", serverName);
}
@Test
public void testClose() throws IOException {
conn.close();
assertTrace("close", null);
}
}

View File

@ -189,6 +189,20 @@ public final class TraceUtil {
}
}
public static <T> T trace(Supplier<T> action, String spanName) {
Span span = createSpan(spanName);
try (Scope scope = span.makeCurrent()) {
T ret = action.get();
span.setStatus(StatusCode.OK);
return ret;
} catch (Throwable e) {
setError(span, e);
throw e;
} finally {
span.end();
}
}
@FunctionalInterface
public interface IOExceptionCallable<V> {
V call() throws IOException;