From c11a3e1b398e250553dc6f97b1d4f9f834a316ed Mon Sep 17 00:00:00 2001 From: "Tak Lon (Stephen) Wu" Date: Wed, 4 Aug 2021 15:37:08 -0700 Subject: [PATCH] Revert "HBASE-26124 Backport HBASE-25373 "Remove HTrace completely in code base and try to make use of OpenTelemetry" to branch-2 (#3529)" This reverts commit f0493016062267fc37e14659d9183673d42a8f1d. --- .../hbase/io/asyncfs/AsyncFSTestBase.java | 3 + hbase-client/pom.xml | 4 +- .../hbase/client/AsyncRequestFutureImpl.java | 14 +- .../ResultBoundedCompletionService.java | 4 +- .../hbase/ipc/BlockingRpcConnection.java | 11 +- .../org/apache/hadoop/hbase/ipc/Call.java | 7 +- hbase-common/pom.xml | 4 +- .../hbase/trace/HBaseHTraceConfiguration.java | 80 ++++++++++ .../hadoop/hbase/trace/SpanReceiverHost.java | 120 ++++++++++++++ .../apache/hadoop/hbase/trace/TraceUtil.java | 105 ++++++++++++- hbase-external-blockcache/pom.xml | 4 + .../hbase/io/hfile/MemcachedBlockCache.java | 14 +- hbase-it/pom.xml | 4 +- .../IntegrationTestTableMapReduceUtil.java | 1 + .../hbase/mttr/IntegrationTestMTTR.java | 21 ++- .../IntegrationTestSendTraceRequests.java | 133 ++++++++-------- hbase-mapreduce/pom.xml | 4 +- .../hbase/mapreduce/TableMapReduceUtil.java | 1 + .../hadoop/hbase/PerformanceEvaluation.java | 29 ++-- hbase-protocol-shaded/pom.xml | 4 + hbase-server/pom.xml | 4 +- .../hadoop/hbase/executor/EventHandler.java | 16 +- .../hbase/io/hfile/HFileReaderImpl.java | 11 +- .../apache/hadoop/hbase/ipc/CallRunner.java | 31 ++-- .../apache/hadoop/hbase/master/HMaster.java | 2 + .../hbase/master/HMasterCommandLine.java | 6 +- .../hadoop/hbase/regionserver/HRegion.java | 15 +- .../hbase/regionserver/HRegionServer.java | 9 ++ .../HRegionServerCommandLine.java | 8 +- .../hbase/regionserver/MemStoreFlusher.java | 14 +- .../hbase/regionserver/wal/AbstractFSWAL.java | 19 +-- .../hbase/regionserver/wal/AsyncFSWAL.java | 21 +-- .../hadoop/hbase/regionserver/wal/FSHLog.java | 22 ++- .../hadoop/hbase/HBaseTestingUtility.java | 13 +- .../hbase/executor/TestExecutorService.java | 4 +- .../hadoop/hbase/trace/TestHTraceHooks.java | 134 ++++++++++++++++ .../apache/hadoop/hbase/trace/TraceTree.java | 148 ++++++++++++++++++ .../hbase/wal/WALPerformanceEvaluation.java | 73 ++++++--- hbase-shaded/hbase-shaded-client/pom.xml | 1 - .../hbase-shaded-testing-util/pom.xml | 1 - hbase-shaded/pom.xml | 1 - .../src/main/ruby/shell/commands/trace.rb | 43 +++-- hbase-zookeeper/pom.xml | 4 - .../hbase/zookeeper/RecoverableZooKeeper.java | 77 +++------ pom.xml | 30 +--- 45 files changed, 913 insertions(+), 361 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TraceTree.java diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java index fc148e8de79..9b276aca078 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.io.asyncfs; import java.io.File; import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +104,7 @@ public abstract class AsyncFSTestBase { org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class) .setLevel(org.apache.log4j.Level.ERROR); + TraceUtil.initTracer(conf); CLUSTER = new MiniDFSCluster.Builder(conf).numDataNodes(servers).build(); CLUSTER.waitClusterUp(); } diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index e62a7d1d30b..08e91742787 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -145,8 +145,8 @@ zookeeper - io.opentelemetry - opentelemetry-api + org.apache.htrace + htrace-core4 org.jruby.jcodings diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 8cfcf0c1dbb..d2486cc1cf0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -46,8 +46,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.htrace.core.Tracer; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -570,9 +572,13 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { asyncProcess.incTaskCounters(multiAction.getRegions(), server); SingleServerRequestRunnable runnable = createSingleServerRequest( multiAction, numAttempt, server, callsInProgress); + Tracer tracer = Tracer.curThreadTracer(); - // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable - return Collections.singletonList(runnable); + if (tracer == null) { + return Collections.singletonList(runnable); + } else { + return Collections.singletonList(tracer.wrap(runnable, "AsyncProcess.sendMultiAction")); + } } // group the actions by the amount of delay @@ -592,10 +598,12 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { List toReturn = new ArrayList<>(actions.size()); for (DelayingRunner runner : actions.values()) { asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); + String traceText = "AsyncProcess.sendMultiAction"; Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); + traceText = "AsyncProcess.clientBackoff.sendMultiAction"; runnable = runner; if (asyncProcess.connection.getConnectionMetrics() != null) { asyncProcess.connection.getConnectionMetrics() @@ -606,7 +614,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); } } - // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable + runnable = TraceUtil.wrap(runnable, traceText); toReturn.add(runnable); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java index 4a96954b21a..965b13c2134 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -26,6 +26,7 @@ import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,8 +168,7 @@ public class ResultBoundedCompletionService { public void submit(RetryingCallable task, int callTimeout, int id) { QueueingFuture newFuture = new QueueingFuture<>(task, callTimeout, id); - // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable - executor.execute(newFuture); + executor.execute(TraceUtil.wrap(newFuture, "ResultBoundedCompletionService.submit")); tasks[id] = newFuture; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index 1a5cb73bccf..cd8035fd58e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -24,9 +24,6 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException; import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; import static org.apache.hadoop.hbase.ipc.IPCUtil.write; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -65,6 +62,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -595,12 +593,9 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { } private void tracedWriteRequest(Call call) throws IOException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest") - .setParent(Context.current().with(call.span)).startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope ignored = TraceUtil.createTrace("RpcClientImpl.tracedWriteRequest", + call.span)) { writeRequest(call); - } finally { - span.end(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 113f731aaa2..7793680ca54 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.util.Optional; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -25,13 +24,13 @@ import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.Tracer; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.util.Timeout; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** A call waiting for a value. */ @@ -74,7 +73,7 @@ class Call { this.timeout = timeout; this.priority = priority; this.callback = callback; - this.span = Span.current(); + this.span = Tracer.getCurrentSpan(); } /** diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml index 62661b80f6b..64007c0bf5b 100644 --- a/hbase-common/pom.xml +++ b/hbase-common/pom.xml @@ -192,8 +192,8 @@ - io.opentelemetry - opentelemetry-api + org.apache.htrace + htrace-core4 org.apache.commons diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java new file mode 100644 index 00000000000..03d03d9fe4e --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.trace; + +import org.apache.hadoop.conf.Configuration; +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class HBaseHTraceConfiguration extends HTraceConfiguration { + private static final Logger LOG = LoggerFactory.getLogger(HBaseHTraceConfiguration.class); + + public static final String KEY_PREFIX = "hbase.htrace."; + + private Configuration conf; + + private void handleDeprecation(String key) { + String oldKey = "hbase." + key; + String newKey = KEY_PREFIX + key; + String oldValue = conf.get(oldKey); + if (oldValue != null) { + LOG.warn("Warning: using deprecated configuration key " + oldKey + + ". Please use " + newKey + " instead."); + String newValue = conf.get(newKey); + if (newValue == null) { + conf.set(newKey, oldValue); + } else { + LOG.warn("Conflicting values for " + newKey + " and " + oldKey + + ". Using " + newValue); + } + } + } + + public HBaseHTraceConfiguration(Configuration conf) { + this.conf = conf; + handleDeprecation("local-file-span-receiver.path"); + handleDeprecation("local-file-span-receiver.capacity"); + handleDeprecation("sampler.frequency"); + handleDeprecation("sampler.fraction"); + handleDeprecation("zipkin.collector-hostname"); + handleDeprecation("zipkin.collector-port"); + handleDeprecation("zipkin.num-threads"); + handleDeprecation("zipkin.traced-service-hostname"); + handleDeprecation("zipkin.traced-service-port"); + } + + @Override + public String get(String key) { + return conf.get(KEY_PREFIX + key); + } + + @Override + public String get(String key, String defaultValue) { + return conf.get(KEY_PREFIX + key,defaultValue); + + } + + @Override + public boolean getBoolean(String key, boolean defaultValue) { + return conf.getBoolean(KEY_PREFIX + key, defaultValue); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java new file mode 100644 index 00000000000..b967db7f27d --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.htrace.core.SpanReceiver; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides functions for reading the names of SpanReceivers from + * hbase-site.xml, adding those SpanReceivers to the Tracer, and closing those + * SpanReceivers when appropriate. + */ +@InterfaceAudience.Private +public class SpanReceiverHost { + public static final String SPAN_RECEIVERS_CONF_KEY = "hbase.trace.spanreceiver.classes"; + private static final Logger LOG = LoggerFactory.getLogger(SpanReceiverHost.class); + private Collection receivers; + private Configuration conf; + private boolean closed = false; + + private enum SingletonHolder { + INSTANCE; + final transient Object lock = new Object(); + transient SpanReceiverHost host = null; + } + + public static SpanReceiverHost getInstance(Configuration conf) { + synchronized (SingletonHolder.INSTANCE.lock) { + if (SingletonHolder.INSTANCE.host != null) { + return SingletonHolder.INSTANCE.host; + } + + SpanReceiverHost host = new SpanReceiverHost(conf); + host.loadSpanReceivers(); + SingletonHolder.INSTANCE.host = host; + return SingletonHolder.INSTANCE.host; + } + + } + + public static Configuration getConfiguration(){ + synchronized (SingletonHolder.INSTANCE.lock) { + if (SingletonHolder.INSTANCE.host == null || SingletonHolder.INSTANCE.host.conf == null) { + return null; + } + + return SingletonHolder.INSTANCE.host.conf; + } + } + + SpanReceiverHost(Configuration conf) { + receivers = new HashSet<>(); + this.conf = conf; + } + + /** + * Reads the names of classes specified in the {@code hbase.trace.spanreceiver.classes} property + * and instantiates and registers them with the Tracer. + */ + public void loadSpanReceivers() { + String[] receiverNames = conf.getStrings(SPAN_RECEIVERS_CONF_KEY); + if (receiverNames == null || receiverNames.length == 0) { + return; + } + + SpanReceiver.Builder builder = new SpanReceiver.Builder(new HBaseHTraceConfiguration(conf)); + for (String className : receiverNames) { + className = className.trim(); + + SpanReceiver receiver = builder.className(className).build(); + if (receiver != null) { + receivers.add(receiver); + LOG.info("SpanReceiver {} was loaded successfully.", className); + } + } + for (SpanReceiver rcvr : receivers) { + TraceUtil.addReceiver(rcvr); + } + } + + /** + * Calls close() on all SpanReceivers created by this SpanReceiverHost. + */ + public synchronized void closeReceivers() { + if (closed) { + return; + } + + closed = true; + for (SpanReceiver rcvr : receivers) { + try { + rcvr.close(); + } catch (IOException e) { + LOG.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e); + } + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index f7a111f5901..10665d89826 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -17,19 +17,112 @@ */ package org.apache.hadoop.hbase.trace; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.trace.Tracer; +import org.apache.hadoop.conf.Configuration; +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.htrace.core.Sampler; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanReceiver; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import org.apache.yetus.audience.InterfaceAudience; +/** + * This wrapper class provides functions for accessing htrace 4+ functionality in a simplified way. + */ @InterfaceAudience.Private public final class TraceUtil { - - private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.hbase"; + private static HTraceConfiguration conf; + private static Tracer tracer; private TraceUtil() { } - public static Tracer getGlobalTracer() { - return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME); + public static void initTracer(Configuration c) { + if (c != null) { + conf = new HBaseHTraceConfiguration(c); + } + + if (tracer == null && conf != null) { + tracer = new Tracer.Builder("Tracer").conf(conf).build(); + } + } + + /** + * Wrapper method to create new TraceScope with the given description + * @return TraceScope or null when not tracing + */ + public static TraceScope createTrace(String description) { + return (tracer == null) ? null : tracer.newScope(description); + } + + /** + * Wrapper method to create new child TraceScope with the given description + * and parent scope's spanId + * @param span parent span + * @return TraceScope or null when not tracing + */ + public static TraceScope createTrace(String description, Span span) { + if (span == null) { + return createTrace(description); + } + + return (tracer == null) ? null : tracer.newScope(description, span.getSpanId()); + } + + /** + * Wrapper method to add new sampler to the default tracer + * @return true if added, false if it was already added + */ + public static boolean addSampler(Sampler sampler) { + if (sampler == null) { + return false; + } + + return (tracer == null) ? false : tracer.addSampler(sampler); + } + + /** + * Wrapper method to add key-value pair to TraceInfo of actual span + */ + public static void addKVAnnotation(String key, String value){ + Span span = Tracer.getCurrentSpan(); + if (span != null) { + span.addKVAnnotation(key, value); + } + } + + /** + * Wrapper method to add receiver to actual tracerpool + * @return true if successfull, false if it was already added + */ + public static boolean addReceiver(SpanReceiver rcvr) { + return (tracer == null) ? false : tracer.getTracerPool().addReceiver(rcvr); + } + + /** + * Wrapper method to remove receiver from actual tracerpool + * @return true if removed, false if doesn't exist + */ + public static boolean removeReceiver(SpanReceiver rcvr) { + return (tracer == null) ? false : tracer.getTracerPool().removeReceiver(rcvr); + } + + /** + * Wrapper method to add timeline annotiation to current span with given message + */ + public static void addTimelineAnnotation(String msg) { + Span span = Tracer.getCurrentSpan(); + if (span != null) { + span.addTimelineAnnotation(msg); + } + } + + /** + * Wrap runnable with current tracer and description + * @param runnable to wrap + * @return wrapped runnable or original runnable when not tracing + */ + public static Runnable wrap(Runnable runnable, String description) { + return (tracer == null) ? runnable : tracer.wrap(runnable, description); } } diff --git a/hbase-external-blockcache/pom.xml b/hbase-external-blockcache/pom.xml index 1cdffc873b1..0b37d5dc035 100644 --- a/hbase-external-blockcache/pom.xml +++ b/hbase-external-blockcache/pom.xml @@ -109,6 +109,10 @@ org.slf4j slf4j-api + + org.apache.htrace + htrace-core4 + junit junit diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index 350d2c72f79..246d7e0a138 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.io.hfile; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -29,11 +27,13 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.ExecutionException; + import net.spy.memcached.CachedData; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.FailureMode; import net.spy.memcached.MemcachedClient; import net.spy.memcached.transcoders.Transcoder; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.ByteBuffAllocator; @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Addressing; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,12 +129,12 @@ public class MemcachedBlockCache implements BlockCache { } @Override - public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, - boolean updateCacheMetrics) { + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, + boolean repeat, boolean updateCacheMetrics) { // Assume that nothing is the block cache HFileBlock result = null; - Span span = TraceUtil.getGlobalTracer().spanBuilder("MemcachedBlockCache.getBlock").startSpan(); - try (Scope traceScope = span.makeCurrent()) { + + try (TraceScope traceScope = TraceUtil.createTrace("MemcachedBlockCache.getBlock")) { result = client.get(cacheKey.toString(), tc); } catch (Exception e) { // Catch a pretty broad set of exceptions to limit any changes in the memecache client @@ -145,7 +146,6 @@ public class MemcachedBlockCache implements BlockCache { } result = null; } finally { - span.end(); // Update stats if this request doesn't have it turned off 100% of the time if (updateCacheMetrics) { if (result == null) { diff --git a/hbase-it/pom.xml b/hbase-it/pom.xml index d4ef4ec67c8..d1213a009d2 100644 --- a/hbase-it/pom.xml +++ b/hbase-it/pom.xml @@ -247,8 +247,8 @@ commons-lang3 - io.opentelemetry - opentelemetry-api + org.apache.htrace + htrace-core4 diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java index 1cd43047b57..e21dfecab98 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java @@ -78,6 +78,7 @@ public class IntegrationTestTableMapReduceUtil implements Configurable, Tool { assertTrue(tmpjars.contains("netty")); assertTrue(tmpjars.contains("protobuf")); assertTrue(tmpjars.contains("guava")); + assertTrue(tmpjars.contains("htrace")); } @Override diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java index 5e390d28bcf..d946045d628 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java @@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.mttr; import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeFalse; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.Callable; @@ -66,6 +64,9 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.htrace.core.AlwaysSampler; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.TraceScope; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -375,9 +376,12 @@ public class IntegrationTestMTTR { * @param span Span. To be kept if the time taken was over 1 second */ public void addResult(long time, Span span) { + if (span == null) { + return; + } stats.addValue(TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS)); if (TimeUnit.SECONDS.convert(time, TimeUnit.NANOSECONDS) >= 1) { - traces.add(span.getSpanContext().getTraceIdAsHexString()); + traces.add(span.getTracerId()); } } @@ -417,11 +421,15 @@ public class IntegrationTestMTTR { final int maxIterations = 10; int numAfterDone = 0; int resetCount = 0; + TraceUtil.addSampler(AlwaysSampler.INSTANCE); // Keep trying until the rs is back up and we've gotten a put through while (numAfterDone < maxIterations) { long start = System.nanoTime(); - Span span = TraceUtil.getGlobalTracer().spanBuilder(getSpanName()).startSpan(); - try (Scope scope = span.makeCurrent()) { + Span span = null; + try (TraceScope scope = TraceUtil.createTrace(getSpanName())) { + if (scope != null) { + span = scope.getSpan(); + } boolean actionResult = doAction(); if (actionResult && future.isDone()) { numAfterDone++; @@ -452,6 +460,7 @@ public class IntegrationTestMTTR { throw e; } catch (RetriesExhaustedException e){ throw e; + // Everything else is potentially recoverable on the application side. For instance, a CM // action kills the RS that hosted a scanner the client was using. Continued use of that // scanner should be terminated, but a new scanner can be created and the read attempted @@ -466,8 +475,6 @@ public class IntegrationTestMTTR { LOG.info("Too many unexpected Exceptions. Aborting.", e); throw e; } - } finally { - span.end(); } result.addResult(System.nanoTime() - start, span); } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java index e9f3aa062e3..ea219db9b02 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java @@ -18,19 +18,10 @@ package org.apache.hadoop.hbase.trace; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; @@ -40,21 +31,26 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ToolRunner; +import org.apache.htrace.core.Sampler; +import org.apache.htrace.core.TraceScope; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + @Category(IntegrationTests.class) public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { - private static final Logger LOG = - LoggerFactory.getLogger(IntegrationTestSendTraceRequests.class); + public static final String TABLE_ARG = "t"; public static final String CF_ARG = "f"; @@ -65,6 +61,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { private IntegrationTestingUtility util; private Random random = new Random(); private Admin admin; + private SpanReceiverHost receiverHost; public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); @@ -98,6 +95,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { public void internalDoWork() throws Exception { util = createUtil(); admin = util.getAdmin(); + setupReceiver(); deleteTable(); createTable(); @@ -110,53 +108,51 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { service.shutdown(); service.awaitTermination(100, TimeUnit.SECONDS); Thread.sleep(90000); + receiverHost.closeReceivers(); util.restoreCluster(); util = null; } - @SuppressWarnings("FutureReturnValueIgnored") private void doScans(ExecutorService service, final LinkedBlockingQueue rks) { - for (int i = 0; i < 100; i++) { - Runnable runnable = new Runnable() { - private final LinkedBlockingQueue rowKeyQueue = rks; - @Override public void run() { - ResultScanner rs = null; - Span span = TraceUtil.getGlobalTracer().spanBuilder("Scan").startSpan(); - try (Scope scope = span.makeCurrent()) { - Table ht = util.getConnection().getTable(tableName); - Scan s = new Scan(); - s.withStartRow(Bytes.toBytes(rowKeyQueue.take())); - s.setBatch(7); - rs = ht.getScanner(s); - // Something to keep the jvm from removing the loop. - long accum = 0; + for (int i = 0; i < 100; i++) { + Runnable runnable = new Runnable() { + private final LinkedBlockingQueue rowKeyQueue = rks; + @Override + public void run() { + ResultScanner rs = null; + TraceUtil.addSampler(Sampler.ALWAYS); + try (TraceScope scope = TraceUtil.createTrace("Scan")){ + Table ht = util.getConnection().getTable(tableName); + Scan s = new Scan(); + s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); + s.setBatch(7); + rs = ht.getScanner(s); + // Something to keep the jvm from removing the loop. + long accum = 0; - for (int x = 0; x < 1000; x++) { - Result r = rs.next(); - accum |= Bytes.toLong(r.getRow()); + for(int x = 0; x < 1000; x++) { + Result r = rs.next(); + accum |= Bytes.toLong(r.getRow()); + } + + TraceUtil.addTimelineAnnotation("Accum result = " + accum); + + ht.close(); + ht = null; + } catch (IOException e) { + e.printStackTrace(); + TraceUtil.addKVAnnotation("exception", e.getClass().getSimpleName()); + } catch (Exception e) { + } finally { + if (rs != null) rs.close(); } - span.addEvent("Accum result = " + accum); - - ht.close(); - ht = null; - } catch (IOException e) { - LOG.warn("Exception occurred while scanning table", e); - span.addEvent("exception", - Attributes.of(AttributeKey.stringKey("exception"), e.getClass().getSimpleName())); - } catch (Exception e) { - LOG.warn("Exception occurred while scanning table", e); - } finally { - span.end(); - if (rs != null) { - rs.close(); - } } - } - }; - service.submit(runnable); - } + }; + service.submit(runnable); + } + } private void doGets(ExecutorService service, final LinkedBlockingQueue rowKeys) @@ -177,9 +173,9 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } long accum = 0; + TraceUtil.addSampler(Sampler.ALWAYS); for (int x = 0; x < 5; x++) { - Span span = TraceUtil.getGlobalTracer().spanBuilder("gets").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("gets")) { long rk = rowKeyQueue.take(); Result r1 = ht.get(new Get(Bytes.toBytes(rk))); if (r1 != null) { @@ -189,12 +185,10 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { if (r2 != null) { accum |= Bytes.toLong(r2.getRow()); } - span.addEvent("Accum = " + accum); + TraceUtil.addTimelineAnnotation("Accum = " + accum); } catch (IOException|InterruptedException ie) { // IGNORED - } finally { - span.end(); } } @@ -205,22 +199,18 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } private void createTable() throws IOException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("createTable").startSpan(); - try (Scope scope = span.makeCurrent()) { + TraceUtil.addSampler(Sampler.ALWAYS); + try (TraceScope scope = TraceUtil.createTrace("createTable")) { util.createTable(tableName, familyName); - } finally { - span.end(); } } private void deleteTable() throws IOException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("deleteTable").startSpan(); - try (Scope scope = span.makeCurrent()) { + TraceUtil.addSampler(Sampler.ALWAYS); + try (TraceScope scope = TraceUtil.createTrace("deleteTable")) { if (admin.tableExists(tableName)) { util.deleteTable(tableName); } - } finally { - span.end(); } } @@ -228,9 +218,9 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { LinkedBlockingQueue rowKeys = new LinkedBlockingQueue<>(25000); BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); byte[] value = new byte[300]; + TraceUtil.addSampler(Sampler.ALWAYS); for (int x = 0; x < 5000; x++) { - Span span = TraceUtil.getGlobalTracer().spanBuilder("insertData").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope traceScope = TraceUtil.createTrace("insertData")) { for (int i = 0; i < 5; i++) { long rk = random.nextLong(); rowKeys.add(rk); @@ -244,8 +234,6 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { if ((x % 1000) == 0) { admin.flush(tableName); } - } finally { - span.end(); } } admin.flush(tableName); @@ -267,4 +255,11 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } return this.util; } + + private void setupReceiver() { + Configuration conf = new Configuration(util.getConfiguration()); + conf.setBoolean("hbase.zipkin.is-in-client-mode", true); + + this.receiverHost = SpanReceiverHost.getInstance(conf); + } } diff --git a/hbase-mapreduce/pom.xml b/hbase-mapreduce/pom.xml index 9f2d93373f9..52fd92f2495 100644 --- a/hbase-mapreduce/pom.xml +++ b/hbase-mapreduce/pom.xml @@ -155,8 +155,8 @@ slf4j-api - io.opentelemetry - opentelemetry-api + org.apache.htrace + htrace-core4 org.apache.hbase diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 7910a5f3fac..fe8031cf31b 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -831,6 +831,7 @@ public class TableMapReduceUtil { org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty org.apache.zookeeper.ZooKeeper.class, // zookeeper com.google.protobuf.Message.class, // protobuf + org.apache.htrace.core.Tracer.class, // htrace com.codahale.metrics.MetricRegistry.class, // metrics-core org.apache.commons.lang3.ArrayUtils.class); // commons-lang } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 68967d63f2d..9cbae3377a1 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase; import com.codahale.metrics.Histogram; import com.codahale.metrics.UniformReservoir; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.io.PrintStream; import java.lang.reflect.Constructor; @@ -86,6 +84,8 @@ import org.apache.hadoop.hbase.io.hfile.RandomDistribution; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.CompactingMemStore; +import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; +import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.ByteArrayHashKey; import org.apache.hadoop.hbase.util.Bytes; @@ -104,6 +104,9 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.htrace.core.ProbabilitySampler; +import org.apache.htrace.core.Sampler; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -694,10 +697,6 @@ public class PerformanceEvaluation extends Configured implements Tool { int totalRows = DEFAULT_ROWS_PER_GB; int measureAfter = 0; float sampleRate = 1.0f; - /** - * @deprecated Useless after switching to OpenTelemetry - */ - @Deprecated double traceRate = 0.0; String tableName = TABLE_NAME; boolean flushCommits = true; @@ -1148,6 +1147,8 @@ public class PerformanceEvaluation extends Configured implements Tool { protected final TestOptions opts; private final Status status; + private final Sampler traceSampler; + private final SpanReceiverHost receiverHost; private String testName; private Histogram latencyHistogram; @@ -1169,9 +1170,18 @@ public class PerformanceEvaluation extends Configured implements Tool { */ TestBase(final Configuration conf, final TestOptions options, final Status status) { this.conf = conf; + this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf); this.opts = options; this.status = status; this.testName = this.getClass().getSimpleName(); + if (options.traceRate >= 1.0) { + this.traceSampler = Sampler.ALWAYS; + } else if (options.traceRate > 0.0) { + conf.setDouble("hbase.sampler.fraction", options.traceRate); + this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf)); + } else { + this.traceSampler = Sampler.NEVER; + } everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); if (options.isValueZipf()) { this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); @@ -1341,6 +1351,7 @@ public class PerformanceEvaluation extends Configured implements Tool { YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); } } + receiverHost.closeReceivers(); } abstract void onTakedown() throws IOException; @@ -1377,6 +1388,7 @@ public class PerformanceEvaluation extends Configured implements Tool { void testTimed() throws IOException, InterruptedException { int startRow = getStartRow(); int lastRow = getLastRow(); + TraceUtil.addSampler(traceSampler); // Report on completion of 1/10th of total. for (int ii = 0; ii < opts.cycles; ii++) { if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); @@ -1384,11 +1396,8 @@ public class PerformanceEvaluation extends Configured implements Tool { if (i % everyN != 0) continue; long startTime = System.nanoTime(); boolean requestSent = false; - Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); - try (Scope scope = span.makeCurrent()){ + try (TraceScope scope = TraceUtil.createTrace("test row");){ requestSent = testRow(i, startTime); - } finally { - span.end(); } if ( (i - startRow) > opts.measureAfter) { // If multiget or multiput is enabled, say set to 10, testRow() returns immediately diff --git a/hbase-protocol-shaded/pom.xml b/hbase-protocol-shaded/pom.xml index b9fd6063f80..9a0b55311d9 100644 --- a/hbase-protocol-shaded/pom.xml +++ b/hbase-protocol-shaded/pom.xml @@ -201,6 +201,10 @@ junit test + + org.apache.htrace + htrace-core4 + diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index a9a7b72efa8..b0ee3e78f34 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -427,8 +427,8 @@ - io.opentelemetry - opentelemetry-api + org.apache.htrace + htrace-core4 com.lmax diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index 17054a5c409..df84e004503 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -18,14 +18,14 @@ */ package org.apache.hadoop.hbase.executor; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +75,7 @@ public abstract class EventHandler implements Runnable, Comparable * Default base class constructor. */ public EventHandler(Server server, EventType eventType) { - this.parent = Span.current(); + this.parent = Tracer.getCurrentSpan(); this.server = server; this.eventType = eventType; seqid = seqids.incrementAndGet(); @@ -100,14 +100,10 @@ public abstract class EventHandler implements Runnable, Comparable @Override public void run() { - Span span = TraceUtil.getGlobalTracer().spanBuilder(getClass().getSimpleName()) - .setParent(Context.current().with(parent)).startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace(this.getClass().getSimpleName(), parent)) { process(); - } catch (Throwable t) { + } catch(Throwable t) { handleException(t); - } finally { - span.end(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 0bb8d23348f..7375fa8ddb5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; @@ -50,6 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.io.WritableUtils; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1288,8 +1287,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { boolean useLock = false; IdLock.Entry lockEntry = null; - Span span = TraceUtil.getGlobalTracer().spanBuilder("HFileReaderImpl.readBlock").startSpan(); - try (Scope traceScope = span.makeCurrent()) { + try (TraceScope traceScope = TraceUtil.createTrace("HFileReaderImpl.readBlock")) { while (true) { // Check cache for block. If found return. if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) { @@ -1304,7 +1302,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { if (LOG.isTraceEnabled()) { LOG.trace("From Cache " + cachedBlock); } - span.addEvent("blockCacheHit"); + TraceUtil.addTimelineAnnotation("blockCacheHit"); assert cachedBlock.isUnpacked() : "Packed block leak."; if (cachedBlock.getBlockType().isData()) { if (updateCacheMetrics) { @@ -1334,7 +1332,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { // Carry on, please load. } - span.addEvent("blockCacheMiss"); + TraceUtil.addTimelineAnnotation("blockCacheMiss"); // Load block from filesystem. HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, !isCompaction, shouldUseHeap(expectedBlockType)); @@ -1364,7 +1362,6 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { if (lockEntry != null) { offsetLock.releaseLockEntry(lockEntry); } - span.end(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 3ae089e7845..e5354d7db8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -17,24 +17,23 @@ */ package org.apache.hadoop.hbase.ipc; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Optional; + import org.apache.hadoop.hbase.CallDroppedException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; - import org.apache.hbase.thirdparty.com.google.protobuf.Message; /** @@ -95,14 +94,6 @@ public class CallRunner { this.rpcServer = null; } - private String getServiceName() { - return call.getService() != null ? call.getService().getDescriptorForType().getName() : ""; - } - - private String getMethodName() { - return call.getMethod() != null ? call.getMethod().getName() : ""; - } - public void run() { try { if (call.disconnectSince() >= 0) { @@ -127,16 +118,18 @@ public class CallRunner { String error = null; Pair resultPair = null; RpcServer.CurCall.set(call); - String serviceName = getServiceName(); - String methodName = getMethodName(); - String traceString = serviceName + "." + methodName; - Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan(); - try (Scope traceScope = span.makeCurrent()) { + TraceScope traceScope = null; + try { if (!this.rpcServer.isStarted()) { InetSocketAddress address = rpcServer.getListenerAddress(); throw new ServerNotRunningYetException("Server " + (address != null ? address : "(channel closed)") + " is not running yet"); } + String serviceName = + call.getService() != null ? call.getService().getDescriptorForType().getName() : ""; + String methodName = (call.getMethod() != null) ? call.getMethod().getName() : ""; + String traceString = serviceName + "." + methodName; + traceScope = TraceUtil.createTrace(traceString); // make the call resultPair = this.rpcServer.call(call, this.status); } catch (TimeoutIOException e){ @@ -158,12 +151,14 @@ public class CallRunner { throw (Error)e; } } finally { + if (traceScope != null) { + traceScope.close(); + } RpcServer.CurCall.set(null); if (resultPair != null) { this.rpcServer.addCallSize(call.getSize() * -1); sucessful = true; } - span.end(); } this.status.markComplete("To send response"); // return back the RPC request read BB we can do here. It is done by now. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8b1638ab240..09050bb0017 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -193,6 +193,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -413,6 +414,7 @@ public class HMaster extends HRegionServer implements MasterServices { */ public HMaster(final Configuration conf) throws IOException { super(conf); + TraceUtil.initTracer(conf); try { if (conf.getBoolean(MAINTENANCE_MODE, false)) { LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java index 0f0a2b655b1..6c3ee1db652 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java @@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.master; import java.io.File; import java.io.IOException; import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -36,7 +39,6 @@ import org.apache.hadoop.hbase.util.ServerCommandLine; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -165,6 +167,8 @@ public class HMasterCommandLine extends ServerCommandLine { private int startMaster() { Configuration conf = getConf(); + TraceUtil.initTracer(conf); + try { // If 'local', defer to LocalHBaseCluster instance. Starts master // and regionserver both in the one JVM. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4a01a9acfd0..6628328a883 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -21,8 +21,6 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import edu.umd.cs.findbugs.annotations.Nullable; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -193,6 +191,7 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -6585,9 +6584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RowLockImpl result = null; boolean success = false; - Span span = TraceUtil.getGlobalTracer().spanBuilder("HRegion.getRowLock").startSpan(); - try (Scope scope = span.makeCurrent()) { - span.addEvent("Getting a " + (readLock ? "readLock" : "writeLock")); + try (TraceScope scope = TraceUtil.createTrace("HRegion.getRowLock")) { + TraceUtil.addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock")); // Keep trying until we have a lock or error out. // TODO: do we need to add a time component here? while (result == null) { @@ -6624,7 +6622,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) { - span.addEvent("Failed to get row lock"); + TraceUtil.addTimelineAnnotation("Failed to get row lock"); String message = "Timed out waiting for lock for row: " + rowKey + " in region " + getRegionInfo().getEncodedName(); if (reachDeadlineFirst) { @@ -6642,7 +6640,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.debug("Thread interrupted waiting for lock on row: {}, in region {}", rowKey, getRegionInfo().getRegionNameAsString()); } - span.addEvent("Interrupted exception getting row lock"); + TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock"); throw throwOnInterrupt(ie); } catch (Error error) { // The maximum lock count for read lock is 64K (hardcoded), when this maximum count @@ -6651,14 +6649,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.warn("Error to get row lock for {}, in region {}, cause: {}", Bytes.toStringBinary(row), getRegionInfo().getRegionNameAsString(), error); IOException ioe = new IOException(error); - span.addEvent("Error getting row lock"); + TraceUtil.addTimelineAnnotation("Error getting row lock"); throw ioe; } finally { // Clean up the counts just in case this was the thing keeping the context alive. if (!success && rowLockContext != null) { rowLockContext.cleanUp(); } - span.end(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f30a8cc1793..8a7728ffd32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -159,6 +159,8 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; +import org.apache.hadoop.hbase.trace.SpanReceiverHost; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -397,6 +399,7 @@ public class HRegionServer extends Thread implements private MetricsRegionServer metricsRegionServer; MetricsRegionServerWrapperImpl metricsRegionServerImpl; + private SpanReceiverHost spanReceiverHost; /** * ChoreService used to schedule tasks that we want to run periodically @@ -593,6 +596,7 @@ public class HRegionServer extends Thread implements */ public HRegionServer(final Configuration conf) throws IOException { super("RegionServer"); // thread name + TraceUtil.initTracer(conf); try { this.startcode = EnvironmentEdgeManager.currentTime(); this.conf = conf; @@ -664,6 +668,7 @@ public class HRegionServer extends Thread implements (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); initializeFileSystem(); + spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); this.configurationManager = new ConfigurationManager(); setupWindows(getConfiguration(), getConfigurationManager()); @@ -2712,6 +2717,10 @@ public class HRegionServer extends Thread implements if (this.cacheFlusher != null) { this.cacheFlusher.join(); } + + if (this.spanReceiverHost != null) { + this.spanReceiverHost.closeReceivers(); + } if (this.walRoller != null) { this.walRoller.close(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java index 5fd12333ad9..afd85f8d789 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java @@ -18,13 +18,14 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.util.ServerCommandLine; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Class responsible for parsing the command line and starting the @@ -50,6 +51,7 @@ public class HRegionServerCommandLine extends ServerCommandLine { private int start() throws Exception { Configuration conf = getConf(); + TraceUtil.initTracer(conf); try { // If 'local', don't start a region server here. Defer to // LocalHBaseCluster. It manages 'local' clusters. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 08563570dbe..77755bd3060 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; @@ -38,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; @@ -50,12 +49,12 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * Thread that flushes cache on request * @@ -713,12 +712,10 @@ public class MemStoreFlusher implements FlushRequester { * amount of memstore consumption. */ public void reclaimMemStoreMemory() { - Span span = - TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) { FlushType flushType = isAboveHighWaterMark(); if (flushType != FlushType.NORMAL) { - span.addEvent("Force Flush. We're above high water mark."); + TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark."); long start = EnvironmentEdgeManager.currentTime(); long nextLogTimeMs = start; synchronized (this.blockSignal) { @@ -787,7 +784,6 @@ public class MemStoreFlusher implements FlushRequester { if (flushType != FlushType.NORMAL) { wakeupFlushThread(); } - span.end(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 5c247d10140..0d9c14048bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -25,8 +25,6 @@ import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.c import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; import com.lmax.disruptor.RingBuffer; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -85,6 +83,7 @@ import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -785,12 +784,9 @@ public abstract class AbstractFSWAL implements WAL { * @throws IOException if there is a problem flushing or closing the underlying FS */ Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHFile.replaceWriter").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) { doReplaceWriter(oldPath, newPath, nextWriter); return newPath; - } finally { - span.end(); } } @@ -838,8 +834,7 @@ public abstract class AbstractFSWAL implements WAL { LOG.debug("WAL closed. Skipping rolling of writer"); return regionsToFlush; } - Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.rollWriter").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) { Path oldPath = getOldPath(); Path newPath = getNewPath(); // Any exception from here on is catastrophic, non-recoverable so we currently abort. @@ -866,8 +861,6 @@ public abstract class AbstractFSWAL implements WAL { throw new IOException( "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", exception); - } finally { - span.end(); } return regionsToFlush; } finally { @@ -1059,7 +1052,7 @@ public abstract class AbstractFSWAL implements WAL { .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)) .append(" ms, current pipeline: ") .append(Arrays.toString(getPipeline())).toString(); - Span.current().addEvent(msg); + TraceUtil.addTimelineAnnotation(msg); LOG.info(msg); // A single sync took too long. // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative @@ -1095,14 +1088,12 @@ public abstract class AbstractFSWAL implements WAL { long txid = txidHolder.longValue(); ServerCall rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); - Span span = TraceUtil.getGlobalTracer().spanBuilder(implClassName + ".append").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); entry.stampRegionSequenceId(we); ringBuffer.get(txid).load(entry); } finally { ringBuffer.publish(txid); - span.end(); } return txid; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index eef0575da3b..ae26a47a494 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -20,11 +20,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequencer; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayDeque; @@ -45,6 +44,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,11 +58,12 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -400,7 +401,7 @@ public class AsyncFSWAL extends AbstractFSWAL { } private void addTimeAnnotation(SyncFuture future, String annotation) { - Span.current().addEvent(annotation); + TraceUtil.addTimelineAnnotation(annotation); // TODO handle htrace API change, see HBASE-18895 // future.setSpan(scope.getSpan()); } @@ -623,8 +624,7 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override public void sync(boolean forceSync) throws IOException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { long txid = waitingConsumePayloads.next(); SyncFuture future; try { @@ -638,8 +638,6 @@ public class AsyncFSWAL extends AbstractFSWAL { consumeExecutor.execute(consumer); } blockOnSync(future); - } finally { - span.end(); } } @@ -648,8 +646,7 @@ public class AsyncFSWAL extends AbstractFSWAL { if (highestSyncedTxid.get() >= txid) { return; } - Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { // here we do not use ring buffer sequence as txid long sequence = waitingConsumePayloads.next(); SyncFuture future; @@ -664,8 +661,6 @@ public class AsyncFSWAL extends AbstractFSWAL { consumeExecutor.execute(consumer); } blockOnSync(future); - } finally { - span.end(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 762f1a1d000..690f54520ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -29,8 +29,6 @@ import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; @@ -61,6 +59,7 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.htrace.core.TraceScope; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -365,7 +364,7 @@ public class FSHLog extends AbstractFSWAL { // use assert to make sure no change breaks the logic that // sequence and zigzagLatch will be set together assert sequence > 0L : "Failed to get sequence from ring buffer"; - Span.current().addEvent("awaiting safepoint"); + TraceUtil.addTimelineAnnotation("awaiting safepoint"); syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence, false)); } } catch (FailedSyncBeforeLogCloseException e) { @@ -437,11 +436,10 @@ public class FSHLog extends AbstractFSWAL { } private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException { - Span span = Span.current(); try { - span.addEvent("closing writer"); + TraceUtil.addTimelineAnnotation("closing writer"); writer.close(); - span.addEvent("writer closed"); + TraceUtil.addTimelineAnnotation("writer closed"); } catch (IOException ioe) { int errors = closeErrorCount.incrementAndGet(); boolean hasUnflushedEntries = isUnflushedEntries(); @@ -651,10 +649,10 @@ public class FSHLog extends AbstractFSWAL { long start = System.nanoTime(); Throwable lastException = null; try { - Span.current().addEvent("syncing writer"); + TraceUtil.addTimelineAnnotation("syncing writer"); long unSyncedFlushSeq = highestUnsyncedTxid; writer.sync(sf.isForceSync()); - Span.current().addEvent("writer synced"); + TraceUtil.addTimelineAnnotation("writer synced"); if (unSyncedFlushSeq > currentSequence) { currentSequence = unSyncedFlushSeq; } @@ -793,7 +791,7 @@ public class FSHLog extends AbstractFSWAL { } // Sync all known transactions - private void publishSyncThenBlockOnCompletion(Scope scope, boolean forceSync) throws IOException { + private void publishSyncThenBlockOnCompletion(TraceScope scope, boolean forceSync) throws IOException { SyncFuture syncFuture = publishSyncOnRingBuffer(forceSync); blockOnSync(syncFuture); } @@ -825,8 +823,7 @@ public class FSHLog extends AbstractFSWAL { @Override public void sync(boolean forceSync) throws IOException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.sync").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) { publishSyncThenBlockOnCompletion(scope, forceSync); } } @@ -842,8 +839,7 @@ public class FSHLog extends AbstractFSWAL { // Already sync'd. return; } - Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.sync").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) { publishSyncThenBlockOnCompletion(scope, forceSync); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index f67fece8aee..1c4636eb00e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import edu.umd.cs.findbugs.annotations.Nullable; import java.io.File; import java.io.IOException; @@ -50,8 +51,8 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BooleanSupplier; import java.util.stream.Collectors; +import java.util.function.BooleanSupplier; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; @@ -118,10 +119,12 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; @@ -144,12 +147,11 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; -import org.apache.hbase.thirdparty.com.google.common.io.Closeables; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** * Facility for testing HBase. Replacement for @@ -661,6 +663,8 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(), "ERROR"); + TraceUtil.initTracer(conf); + this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); @@ -1168,6 +1172,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { Log4jUtils.setLogLevel(org.apache.hadoop.hbase.ScheduledChore.class.getName(), "INFO"); Configuration c = new Configuration(this.conf); + TraceUtil.initTracer(c); this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(), option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(), option.getRsClass()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java index 5df089b597e..6b58d073e03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java @@ -165,8 +165,8 @@ public class TestExecutorService { private final AtomicBoolean lock; private AtomicInteger counter; - public TestEventHandler(Server server, EventType eventType, AtomicBoolean lock, - AtomicInteger counter) { + public TestEventHandler(Server server, EventType eventType, + AtomicBoolean lock, AtomicInteger counter) { super(server, eventType); this.lock = lock; this.counter = counter; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java new file mode 100644 index 00000000000..b1fc5b98be3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.htrace.core.POJOSpanReceiver; +import org.apache.htrace.core.Sampler; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.TraceScope; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + +@Ignore // We don't support htrace in hbase-2.0.0 and this flakey is a little flakey. +@Category({MiscTests.class, MediumTests.class}) +public class TestHTraceHooks { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHTraceHooks.class); + + private static final byte[] FAMILY_BYTES = "family".getBytes(); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static POJOSpanReceiver rcvr; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void before() throws Exception { + StartMiniClusterOption option = StartMiniClusterOption.builder() + .numMasters(2).numRegionServers(3).numDataNodes(3).build(); + TEST_UTIL.startMiniCluster(option); + rcvr = new POJOSpanReceiver(new HBaseHTraceConfiguration(TEST_UTIL.getConfiguration())); + TraceUtil.addReceiver(rcvr); + TraceUtil.addSampler(new Sampler() { + @Override + public boolean next() { + return true; + } + }); + } + + @AfterClass + public static void after() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TraceUtil.removeReceiver(rcvr); + rcvr = null; + } + + @Test + public void testTraceCreateTable() throws Exception { + Table table; + Span createTableSpan; + try (TraceScope scope = TraceUtil.createTrace("creating table")) { + createTableSpan = scope.getSpan(); + table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY_BYTES); + } + + // Some table creation is async. Need to make sure that everything is full in before + // checking to see if the spans are there. + TEST_UTIL.waitFor(10000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return (rcvr == null) ? true : rcvr.getSpans().size() >= 5; + } + }); + + Collection spans = Sets.newHashSet(rcvr.getSpans()); + List roots = new LinkedList<>(); + TraceTree traceTree = new TraceTree(spans); + roots.addAll(traceTree.getSpansByParent().find(createTableSpan.getSpanId())); + + // Roots was made 3 in hbase2. It used to be 1. We changed it back to 1 on upgrade to + // htrace-4.2 just to get the test to pass (traces are not wholesome in hbase2; TODO). + assertEquals(1, roots.size()); + assertEquals("creating table", createTableSpan.getDescription()); + + if (spans != null) { + assertTrue(spans.size() > 5); + } + + Put put = new Put("row".getBytes()); + put.addColumn(FAMILY_BYTES, "col".getBytes(), "value".getBytes()); + + Span putSpan; + + try (TraceScope scope = TraceUtil.createTrace("doing put")) { + putSpan = scope.getSpan(); + table.put(put); + } + + spans = rcvr.getSpans(); + traceTree = new TraceTree(spans); + roots.clear(); + roots.addAll(traceTree.getSpansByParent().find(putSpan.getSpanId())); + assertEquals(1, roots.size()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TraceTree.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TraceTree.java new file mode 100644 index 00000000000..eb209d0ee15 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TraceTree.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeSet; + +/** + * Used to create the graph formed by spans. + */ +public class TraceTree { + + public static class SpansByParent { + private static Comparator COMPARATOR = + new Comparator() { + @Override + public int compare(Span a, Span b) { + return a.getSpanId().compareTo(b.getSpanId()); + } + }; + + private final TreeSet treeSet; + + private final HashMap> parentToSpans; + + SpansByParent(Collection spans) { + TreeSet treeSet = new TreeSet(COMPARATOR); + parentToSpans = new HashMap>(); + for (Span span : spans) { + treeSet.add(span); + for (SpanId parent : span.getParents()) { + LinkedList list = parentToSpans.get(parent); + if (list == null) { + list = new LinkedList(); + parentToSpans.put(parent, list); + } + list.add(span); + } + if (span.getParents().length == 0) { + LinkedList list = parentToSpans.get(SpanId.INVALID); + if (list == null) { + list = new LinkedList(); + parentToSpans.put(SpanId.INVALID, list); + } + list.add(span); + } + } + this.treeSet = treeSet; + } + + public List find(SpanId parentId) { + LinkedList spans = parentToSpans.get(parentId); + if (spans == null) { + return new LinkedList(); + } + return spans; + } + + public Iterator iterator() { + return Collections.unmodifiableSortedSet(treeSet).iterator(); + } + } + + public static class SpansByProcessId { + private static Comparator COMPARATOR = + new Comparator() { + @Override + public int compare(Span a, Span b) { + return a.getSpanId().compareTo(b.getSpanId()); + } + }; + + private final TreeSet treeSet; + + SpansByProcessId(Collection spans) { + TreeSet treeSet = new TreeSet(COMPARATOR); + for (Span span : spans) { + treeSet.add(span); + } + this.treeSet = treeSet; + } + + public Iterator iterator() { + return Collections.unmodifiableSortedSet(treeSet).iterator(); + } + } + + private final SpansByParent spansByParent; + private final SpansByProcessId spansByProcessId; + + /** + * Create a new TraceTree + * + * @param spans The collection of spans to use to create this TraceTree. Should + * have at least one root span. + */ + public TraceTree(Collection spans) { + if (spans == null) { + spans = Collections.emptySet(); + } + this.spansByParent = new SpansByParent(spans); + this.spansByProcessId = new SpansByProcessId(spans); + } + + public SpansByParent getSpansByParent() { + return spansByParent; + } + + public SpansByProcessId getSpansByProcessId() { + return spansByProcessId; + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + String prefix = ""; + for (Iterator iter = spansByParent.iterator(); iter.hasNext();) { + Span span = iter.next(); + bld.append(prefix).append(span.toString()); + prefix = "\n"; + } + return bld.toString(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 6b6175115be..9f02d2e4ed9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -25,8 +25,6 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; @@ -60,6 +58,8 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; +import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -68,6 +68,10 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.htrace.core.ProbabilitySampler; +import org.apache.htrace.core.Sampler; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,10 +129,12 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { private final boolean noSync; private final HRegion region; private final int syncInterval; + private final Sampler loopSampler; private final NavigableMap scopes; WALPutBenchmark(final HRegion region, final TableDescriptor htd, - final long numIterations, final boolean noSync, final int syncInterval) { + final long numIterations, final boolean noSync, final int syncInterval, + final double traceFreq) { this.numIterations = numIterations; this.noSync = noSync; this.syncInterval = syncInterval; @@ -138,6 +144,24 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { for(byte[] fam : htd.getColumnFamilyNames()) { scopes.put(fam, 0); } + String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes"); + if (spanReceivers == null || spanReceivers.isEmpty()) { + loopSampler = Sampler.NEVER; + } else { + if (traceFreq <= 0.0) { + LOG.warn("Tracing enabled but traceFreq=0."); + loopSampler = Sampler.NEVER; + } else if (traceFreq >= 1.0) { + loopSampler = Sampler.ALWAYS; + if (numIterations > 1000) { + LOG.warn("Full tracing of all iterations will produce a lot of data. Be sure your" + + " SpanReceiver can keep up."); + } + } else { + getConf().setDouble("hbase.sampler.fraction", traceFreq); + loopSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(getConf())); + } + } } @Override @@ -146,14 +170,13 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { byte[] value = new byte[valueSize]; Random rand = new Random(Thread.currentThread().getId()); WAL wal = region.getWAL(); - Span threadSpan = TraceUtil.getGlobalTracer() - .spanBuilder("WALPerfEval." + Thread.currentThread().getName()).startSpan(); - try (Scope threadScope = threadSpan.makeCurrent()) { + + try (TraceScope threadScope = TraceUtil.createTrace("WALPerfEval." + Thread.currentThread().getName())) { int lastSync = 0; + TraceUtil.addSampler(loopSampler); for (int i = 0; i < numIterations; ++i) { - assert Span.current() == threadSpan : "Span leak detected."; - Span loopSpan = TraceUtil.getGlobalTracer().spanBuilder("runLoopIter" + i).startSpan(); - try (Scope loopScope = loopSpan.makeCurrent()) { + assert Tracer.getCurrentSpan() == threadScope.getSpan() : "Span leak detected."; + try (TraceScope loopScope = TraceUtil.createTrace("runLoopIter" + i)) { long now = System.nanoTime(); Put put = setupPut(rand, key, value, numFamilies); WALEdit walEdit = new WALEdit(); @@ -169,14 +192,10 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } } latencyHistogram.update(System.nanoTime() - now); - } finally { - loopSpan.end(); } } } catch (Exception e) { LOG.error(getClass().getSimpleName() + " Thread failed", e); - } finally { - threadSpan.end(); } } } @@ -197,6 +216,9 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { boolean compress = false; String cipher = null; int numRegions = 1; + String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes"); + boolean trace = spanReceivers != null && !spanReceivers.isEmpty(); + double traceFreq = 1.0; // Process command line args for (int i = 0; i < args.length; i++) { String cmd = args[i]; @@ -236,8 +258,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } else if (cmd.equals("-regions")) { numRegions = Integer.parseInt(args[++i]); } else if (cmd.equals("-traceFreq")) { - // keep it here for compatible - System.err.println("-traceFreq is not supported any more"); + traceFreq = Double.parseDouble(args[++i]); } else if (cmd.equals("-h")) { printUsageAndExit(); } else if (cmd.equals("--help")) { @@ -286,8 +307,13 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf())); FileSystem fs = FileSystem.get(getConf()); LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir); - Span span = TraceUtil.getGlobalTracer().spanBuilder("WALPerfEval").startSpan(); - try (Scope scope = span.makeCurrent()){ + + SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null; + final Sampler sampler = trace ? Sampler.ALWAYS : Sampler.NEVER; + TraceUtil.addSampler(sampler); + TraceScope scope = TraceUtil.createTrace("WALPerfEval"); + + try { rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); cleanRegionRootDir(fs, rootRegionDir); CommonFSUtils.setRootDir(getConf(), rootRegionDir); @@ -304,8 +330,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { // a table per desired region means we can avoid carving up the key space final TableDescriptor htd = createHTableDescriptor(i, numFamilies); regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller); - benchmarks[i] = - new WALPutBenchmark(regions[i], htd, numIterations, noSync, syncInterval); + benchmarks[i] = TraceUtil.wrap(new WALPutBenchmark(regions[i], htd, numIterations, noSync, + syncInterval, traceFreq), ""); } ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics). outputTo(System.out).convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build(); @@ -354,14 +380,19 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { if (cleanup) cleanRegionRootDir(fs, rootRegionDir); } } finally { - span.end(); // We may be called inside a test that wants to keep on using the fs. if (!noclosefs) { fs.close(); } + if (scope != null) { + scope.close(); + } + if (receiverHost != null) { + receiverHost.closeReceivers(); + } } - return 0; + return(0); } private static TableDescriptor createHTableDescriptor(final int regionNum, diff --git a/hbase-shaded/hbase-shaded-client/pom.xml b/hbase-shaded/hbase-shaded-client/pom.xml index 2d06a9d487d..44c8bf6d77f 100644 --- a/hbase-shaded/hbase-shaded-client/pom.xml +++ b/hbase-shaded/hbase-shaded-client/pom.xml @@ -79,7 +79,6 @@ log4j:* commons-logging:* org.javassist:* - io.opentelemetry:* diff --git a/hbase-shaded/hbase-shaded-testing-util/pom.xml b/hbase-shaded/hbase-shaded-testing-util/pom.xml index ac6ab4941b8..6ee31e24dfa 100644 --- a/hbase-shaded/hbase-shaded-testing-util/pom.xml +++ b/hbase-shaded/hbase-shaded-testing-util/pom.xml @@ -236,7 +236,6 @@ log4j:* commons-logging:* org.javassist:* - io.opentelemetry:* diff --git a/hbase-shaded/pom.xml b/hbase-shaded/pom.xml index d1dfb06f374..2a44f435d34 100644 --- a/hbase-shaded/pom.xml +++ b/hbase-shaded/pom.xml @@ -159,7 +159,6 @@ log4j:* commons-logging:* org.javassist:* - io.opentelemetry:* diff --git a/hbase-shell/src/main/ruby/shell/commands/trace.rb b/hbase-shell/src/main/ruby/shell/commands/trace.rb index f52f474b837..f2a8ee0b086 100644 --- a/hbase-shell/src/main/ruby/shell/commands/trace.rb +++ b/hbase-shell/src/main/ruby/shell/commands/trace.rb @@ -17,17 +17,16 @@ # limitations under the License. # -# Disable tracing for now as HTrace does not work any more -# java_import org.apache.hadoop.hbase.trace.SpanReceiverHost +java_import org.apache.hadoop.hbase.trace.SpanReceiverHost module Shell module Commands class Trace < Command - # @@conf = org.apache.htrace.core.HTraceConfiguration.fromKeyValuePairs( - # 'sampler.classes', 'org.apache.htrace.core.AlwaysSampler' - # ) - # @@tracer = org.apache.htrace.core.Tracer::Builder.new('HBaseShell').conf(@@conf).build() - # @@tracescope = nil + @@conf = org.apache.htrace.core.HTraceConfiguration.fromKeyValuePairs( + 'sampler.classes', 'org.apache.htrace.core.AlwaysSampler' + ) + @@tracer = org.apache.htrace.core.Tracer::Builder.new('HBaseShell').conf(@@conf).build() + @@tracescope = nil def help <<-EOF @@ -58,23 +57,23 @@ EOF end def trace(startstop, spanname) - # @@receiver ||= SpanReceiverHost.getInstance(@shell.hbase.configuration) - # if startstop == 'start' - # unless tracing? - # @@tracescope = @@tracer.newScope(spanname) - # end - # elsif startstop == 'stop' - # if tracing? - # @@tracescope.close - # @@tracescope = nil - # end - # end - # tracing? + @@receiver ||= SpanReceiverHost.getInstance(@shell.hbase.configuration) + if startstop == 'start' + unless tracing? + @@tracescope = @@tracer.newScope(spanname) + end + elsif startstop == 'stop' + if tracing? + @@tracescope.close + @@tracescope = nil + end + end + tracing? end - # def tracing? - # @@tracescope != nil - # end + def tracing? + @@tracescope != nil + end end end end diff --git a/hbase-zookeeper/pom.xml b/hbase-zookeeper/pom.xml index 18140318121..dd8ab248a76 100644 --- a/hbase-zookeeper/pom.xml +++ b/hbase-zookeeper/pom.xml @@ -148,10 +148,6 @@ org.apache.zookeeper zookeeper - - io.opentelemetry - opentelemetry-api - junit diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index f5cc1b20d77..90cb123c76f 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -18,18 +18,18 @@ */ package org.apache.hadoop.hbase.zookeeper; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; + import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; @@ -164,8 +164,7 @@ public class RecoverableZooKeeper { * exist. */ public void delete(String path, int version) throws InterruptedException, KeeperException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.delete").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) { RetryCounter retryCounter = retryCounterFactory.create(); boolean isRetry = false; // False for first attempt, true for all retries. while (true) { @@ -197,8 +196,6 @@ public class RecoverableZooKeeper { retryCounter.sleepUntilNextRetry(); isRetry = true; } - } finally { - span.end(); } } @@ -207,8 +204,7 @@ public class RecoverableZooKeeper { * @return A Stat instance */ public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -229,8 +225,6 @@ public class RecoverableZooKeeper { } retryCounter.sleepUntilNextRetry(); } - } finally { - span.end(); } } @@ -239,9 +233,7 @@ public class RecoverableZooKeeper { * @return A Stat instance */ public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { - Span span = - TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -263,8 +255,6 @@ public class RecoverableZooKeeper { } retryCounter.sleepUntilNextRetry(); } - } finally { - span.end(); } } @@ -283,9 +273,7 @@ public class RecoverableZooKeeper { */ public List getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException { - Span span = - TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -306,8 +294,6 @@ public class RecoverableZooKeeper { } retryCounter.sleepUntilNextRetry(); } - } finally { - span.end(); } } @@ -317,9 +303,7 @@ public class RecoverableZooKeeper { */ public List getChildren(String path, boolean watch) throws KeeperException, InterruptedException { - Span span = - TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -341,8 +325,6 @@ public class RecoverableZooKeeper { } retryCounter.sleepUntilNextRetry(); } - } finally { - span.end(); } } @@ -352,8 +334,7 @@ public class RecoverableZooKeeper { */ public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -374,8 +355,6 @@ public class RecoverableZooKeeper { } retryCounter.sleepUntilNextRetry(); } - } finally { - span.end(); } } @@ -385,9 +364,7 @@ public class RecoverableZooKeeper { */ public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException { - Span span = - TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -409,8 +386,6 @@ public class RecoverableZooKeeper { } retryCounter.sleepUntilNextRetry(); } - } finally { - span.end(); } } @@ -422,8 +397,7 @@ public class RecoverableZooKeeper { */ public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setData").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) { RetryCounter retryCounter = retryCounterFactory.create(); byte[] newData = ZKMetadata.appendMetaData(id, data); boolean isRetry = false; @@ -463,8 +437,6 @@ public class RecoverableZooKeeper { retryCounter.sleepUntilNextRetry(); isRetry = true; } - } finally { - span.end(); } } @@ -472,9 +444,9 @@ public class RecoverableZooKeeper { * getAcl is an idempotent operation. Retry before throwing exception * @return list of ACLs */ - public List getAcl(String path, Stat stat) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getAcl").startSpan(); - try (Scope scope = span.makeCurrent()) { + public List getAcl(String path, Stat stat) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -495,8 +467,6 @@ public class RecoverableZooKeeper { } retryCounter.sleepUntilNextRetry(); } - } finally { - span.end(); } } @@ -506,8 +476,7 @@ public class RecoverableZooKeeper { */ public Stat setAcl(String path, List acls, int version) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setAcl").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -527,8 +496,6 @@ public class RecoverableZooKeeper { } retryCounter.sleepUntilNextRetry(); } - } finally { - span.end(); } } @@ -547,10 +514,10 @@ public class RecoverableZooKeeper { * * @return Path */ - public String create(String path, byte[] data, List acl, CreateMode createMode) + public String create(String path, byte[] data, List acl, + CreateMode createMode) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.create").startSpan(); - try (Scope scope = span.makeCurrent()) { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) { byte[] newData = ZKMetadata.appendMetaData(id, data); switch (createMode) { case EPHEMERAL: @@ -565,8 +532,6 @@ public class RecoverableZooKeeper { throw new IllegalArgumentException("Unrecognized CreateMode: " + createMode); } - } finally { - span.end(); } } @@ -682,9 +647,9 @@ public class RecoverableZooKeeper { /** * Run multiple operations in a transactional manner. Retry before throwing exception */ - public List multi(Iterable ops) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.multi").startSpan(); - try (Scope scope = span.makeCurrent()) { + public List multi(Iterable ops) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) { RetryCounter retryCounter = retryCounterFactory.create(); Iterable multiOps = prepareZKMulti(ops); while (true) { @@ -706,8 +671,6 @@ public class RecoverableZooKeeper { } retryCounter.sleepUntilNextRetry(); } - } finally { - span.end(); } } diff --git a/pom.xml b/pom.xml index 7803e88cb7f..87e3b475d1c 100755 --- a/pom.xml +++ b/pom.xml @@ -1025,25 +1025,6 @@ - - banned-htrace - - enforce - - - - - - org.apache.htrace:** - - - Use OpenTelemetry instead - - false - - - - check-aggregate-license @@ -1153,10 +1134,9 @@ true 512 - Do not use htrace + Do not use htrace v3 org.htrace.** - org.apache.htrace.** @@ -1482,7 +1462,7 @@ 9.2.13.0 4.13 1.3 - 0.12.0 + 4.2.0-incubating 1.2.17 2.28.2 @@ -2174,9 +2154,9 @@ test - io.opentelemetry - opentelemetry-api - ${opentelemetry.version} + org.apache.htrace + htrace-core4 + ${htrace.version} com.lmax