HBASE-26764 Implement generic exception support for TraceUtil methods over Callables and Runnables
For the `TraceUtil` methods that accept `Callable` and `Runnable` types, make them generic over a child of `Throwable`. This allows us to consolidate the two method signatures into a single more flexible definition. Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
dbaa68ab58
commit
9c037b1be7
|
@ -27,7 +27,6 @@ import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRI
|
||||||
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
|
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
|
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
|
||||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||||
|
|
||||||
import io.opentelemetry.api.trace.Span;
|
import io.opentelemetry.api.trace.Span;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
@ -40,7 +39,6 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Supplier;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.AuthUtil;
|
import org.apache.hadoop.hbase.AuthUtil;
|
||||||
|
@ -61,10 +59,8 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
||||||
|
@ -444,13 +440,7 @@ public class AsyncConnectionImpl implements AsyncConnection {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Hbck getHbck(ServerName masterServer) {
|
public Hbck getHbck(ServerName masterServer) {
|
||||||
return TraceUtil.trace(new Supplier<Hbck>() {
|
return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck");
|
||||||
|
|
||||||
@Override
|
|
||||||
public Hbck get() {
|
|
||||||
return getHbckInternal(masterServer);
|
|
||||||
}
|
|
||||||
}, "AsyncConnection.getHbck");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<MetricsConnection> getConnectionMetrics() {
|
Optional<MetricsConnection> getConnectionMetrics() {
|
||||||
|
|
|
@ -24,8 +24,8 @@ import io.opentelemetry.api.trace.StatusCode;
|
||||||
import io.opentelemetry.api.trace.Tracer;
|
import io.opentelemetry.api.trace.Tracer;
|
||||||
import io.opentelemetry.context.Context;
|
import io.opentelemetry.context.Context;
|
||||||
import io.opentelemetry.context.Scope;
|
import io.opentelemetry.context.Scope;
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import org.apache.hadoop.hbase.Version;
|
import org.apache.hadoop.hbase.Version;
|
||||||
|
@ -84,7 +84,7 @@ public final class TraceUtil {
|
||||||
Supplier<Span> spanSupplier
|
Supplier<Span> spanSupplier
|
||||||
) {
|
) {
|
||||||
Span span = spanSupplier.get();
|
Span span = spanSupplier.get();
|
||||||
try (Scope scope = span.makeCurrent()) {
|
try (Scope ignored = span.makeCurrent()) {
|
||||||
CompletableFuture<T> future = action.get();
|
CompletableFuture<T> future = action.get();
|
||||||
endSpan(future, span);
|
endSpan(future, span);
|
||||||
return future;
|
return future;
|
||||||
|
@ -97,7 +97,7 @@ public final class TraceUtil {
|
||||||
public static <T> CompletableFuture<T> tracedFuture(Supplier<CompletableFuture<T>> action,
|
public static <T> CompletableFuture<T> tracedFuture(Supplier<CompletableFuture<T>> action,
|
||||||
String spanName) {
|
String spanName) {
|
||||||
Span span = createSpan(spanName);
|
Span span = createSpan(spanName);
|
||||||
try (Scope scope = span.makeCurrent()) {
|
try (Scope ignored = span.makeCurrent()) {
|
||||||
CompletableFuture<T> future = action.get();
|
CompletableFuture<T> future = action.get();
|
||||||
endSpan(future, span);
|
endSpan(future, span);
|
||||||
return future;
|
return future;
|
||||||
|
@ -113,7 +113,7 @@ public final class TraceUtil {
|
||||||
Supplier<Span> spanSupplier
|
Supplier<Span> spanSupplier
|
||||||
) {
|
) {
|
||||||
Span span = spanSupplier.get();
|
Span span = spanSupplier.get();
|
||||||
try (Scope scope = span.makeCurrent()) {
|
try (Scope ignored = span.makeCurrent()) {
|
||||||
List<CompletableFuture<T>> futures = action.get();
|
List<CompletableFuture<T>> futures = action.get();
|
||||||
endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);
|
endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);
|
||||||
return futures;
|
return futures;
|
||||||
|
@ -139,51 +139,61 @@ public final class TraceUtil {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void trace(Runnable action, String spanName) {
|
/**
|
||||||
trace(action, () -> createSpan(spanName));
|
* A {@link Runnable} that may also throw.
|
||||||
}
|
* @param <T> the type of {@link Throwable} that can be produced.
|
||||||
|
*/
|
||||||
public static void trace(Runnable action, Supplier<Span> creator) {
|
|
||||||
Span span = creator.get();
|
|
||||||
try (Scope scope = span.makeCurrent()) {
|
|
||||||
action.run();
|
|
||||||
span.setStatus(StatusCode.OK);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
setError(span, e);
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
span.end();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> T trace(Supplier<T> action, String spanName) {
|
|
||||||
Span span = createSpan(spanName);
|
|
||||||
try (Scope scope = span.makeCurrent()) {
|
|
||||||
T ret = action.get();
|
|
||||||
span.setStatus(StatusCode.OK);
|
|
||||||
return ret;
|
|
||||||
} catch (Throwable e) {
|
|
||||||
setError(span, e);
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
span.end();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface IOExceptionCallable<V> {
|
public interface ThrowingRunnable<T extends Throwable> {
|
||||||
V call() throws IOException;
|
void run() throws T;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> T trace(IOExceptionCallable<T> callable, String spanName) throws IOException {
|
public static <T extends Throwable> void trace(
|
||||||
|
final ThrowingRunnable<T> runnable,
|
||||||
|
final String spanName) throws T {
|
||||||
|
trace(runnable, () -> createSpan(spanName));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T extends Throwable> void trace(
|
||||||
|
final ThrowingRunnable<T> runnable,
|
||||||
|
final Supplier<Span> spanSupplier
|
||||||
|
) throws T {
|
||||||
|
Span span = spanSupplier.get();
|
||||||
|
try (Scope ignored = span.makeCurrent()) {
|
||||||
|
runnable.run();
|
||||||
|
span.setStatus(StatusCode.OK);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
setError(span, e);
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
span.end();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link Callable} that may also throw.
|
||||||
|
* @param <R> the result type of method call.
|
||||||
|
* @param <T> the type of {@link Throwable} that can be produced.
|
||||||
|
*/
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface ThrowingCallable<R, T extends Throwable> {
|
||||||
|
R call() throws T;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <R, T extends Throwable> R trace(
|
||||||
|
final ThrowingCallable<R, T> callable,
|
||||||
|
final String spanName
|
||||||
|
) throws T {
|
||||||
return trace(callable, () -> createSpan(spanName));
|
return trace(callable, () -> createSpan(spanName));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> T trace(IOExceptionCallable<T> callable, Supplier<Span> creator)
|
public static <R, T extends Throwable> R trace(
|
||||||
throws IOException {
|
final ThrowingCallable<R, T> callable,
|
||||||
Span span = creator.get();
|
final Supplier<Span> spanSupplier
|
||||||
try (Scope scope = span.makeCurrent()) {
|
) throws T {
|
||||||
T ret = callable.call();
|
Span span = spanSupplier.get();
|
||||||
|
try (Scope ignored = span.makeCurrent()) {
|
||||||
|
final R ret = callable.call();
|
||||||
span.setStatus(StatusCode.OK);
|
span.setStatus(StatusCode.OK);
|
||||||
return ret;
|
return ret;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
|
|
@ -590,18 +590,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void sync(boolean forceSync) throws IOException {
|
public final void sync(boolean forceSync) throws IOException {
|
||||||
TraceUtil.trace(() -> {
|
TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync"));
|
||||||
doSync(forceSync);
|
|
||||||
return null;
|
|
||||||
}, () -> createSpan("WAL.sync"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void sync(long txid, boolean forceSync) throws IOException {
|
public final void sync(long txid, boolean forceSync) throws IOException {
|
||||||
TraceUtil.trace(() -> {
|
TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));
|
||||||
doSync(txid, forceSync);
|
|
||||||
return null;
|
|
||||||
}, () -> createSpan("WAL.sync"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void doSync(boolean forceSync) throws IOException;
|
protected abstract void doSync(boolean forceSync) throws IOException;
|
||||||
|
|
Loading…
Reference in New Issue