Revert "HBASE-26124 Backport HBASE-25373 "Remove HTrace completely in code base and try to make use of OpenTelemetry" to branch-2 (#3529)"

This reverts commit f0493016062267fc37e14659d9183673d42a8f1d.
This commit is contained in:
Tak Lon (Stephen) Wu 2021-08-04 15:37:08 -07:00
parent fffdcba5bb
commit c11a3e1b39
45 changed files with 913 additions and 361 deletions

View File

@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.io.asyncfs;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -102,6 +104,7 @@ public abstract class AsyncFSTestBase {
org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class) org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class)
.setLevel(org.apache.log4j.Level.ERROR); .setLevel(org.apache.log4j.Level.ERROR);
TraceUtil.initTracer(conf);
CLUSTER = new MiniDFSCluster.Builder(conf).numDataNodes(servers).build(); CLUSTER = new MiniDFSCluster.Builder(conf).numDataNodes(servers).build();
CLUSTER.waitClusterUp(); CLUSTER.waitClusterUp();
} }

View File

@ -145,8 +145,8 @@
<artifactId>zookeeper</artifactId> <artifactId>zookeeper</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.opentelemetry</groupId> <groupId>org.apache.htrace</groupId>
<artifactId>opentelemetry-api</artifactId> <artifactId>htrace-core4</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.jruby.jcodings</groupId> <groupId>org.jruby.jcodings</groupId>

View File

@ -46,8 +46,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.core.Tracer;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -570,9 +572,13 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
asyncProcess.incTaskCounters(multiAction.getRegions(), server); asyncProcess.incTaskCounters(multiAction.getRegions(), server);
SingleServerRequestRunnable runnable = createSingleServerRequest( SingleServerRequestRunnable runnable = createSingleServerRequest(
multiAction, numAttempt, server, callsInProgress); multiAction, numAttempt, server, callsInProgress);
Tracer tracer = Tracer.curThreadTracer();
// remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable if (tracer == null) {
return Collections.singletonList(runnable); return Collections.singletonList(runnable);
} else {
return Collections.singletonList(tracer.wrap(runnable, "AsyncProcess.sendMultiAction"));
}
} }
// group the actions by the amount of delay // group the actions by the amount of delay
@ -592,10 +598,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
List<Runnable> toReturn = new ArrayList<>(actions.size()); List<Runnable> toReturn = new ArrayList<>(actions.size());
for (DelayingRunner runner : actions.values()) { for (DelayingRunner runner : actions.values()) {
asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
String traceText = "AsyncProcess.sendMultiAction";
Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
// use a delay runner only if we need to sleep for some time // use a delay runner only if we need to sleep for some time
if (runner.getSleepTime() > 0) { if (runner.getSleepTime() > 0) {
runner.setRunner(runnable); runner.setRunner(runnable);
traceText = "AsyncProcess.clientBackoff.sendMultiAction";
runnable = runner; runnable = runner;
if (asyncProcess.connection.getConnectionMetrics() != null) { if (asyncProcess.connection.getConnectionMetrics() != null) {
asyncProcess.connection.getConnectionMetrics() asyncProcess.connection.getConnectionMetrics()
@ -606,7 +614,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
} }
} }
// remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable runnable = TraceUtil.wrap(runnable, traceText);
toReturn.add(runnable); toReturn.add(runnable);
} }

View File

@ -26,6 +26,7 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -167,8 +168,7 @@ public class ResultBoundedCompletionService<V> {
public void submit(RetryingCallable<V> task, int callTimeout, int id) { public void submit(RetryingCallable<V> task, int callTimeout, int id) {
QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id); QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id);
// remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable executor.execute(TraceUtil.wrap(newFuture, "ResultBoundedCompletionService.submit"));
executor.execute(newFuture);
tasks[id] = newFuture; tasks[id] = newFuture;
} }

View File

@ -24,9 +24,6 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
import static org.apache.hadoop.hbase.ipc.IPCUtil.write; import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -65,6 +62,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -595,12 +593,9 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
} }
private void tracedWriteRequest(Call call) throws IOException { private void tracedWriteRequest(Call call) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest") try (TraceScope ignored = TraceUtil.createTrace("RpcClientImpl.tracedWriteRequest",
.setParent(Context.current().with(call.span)).startSpan(); call.span)) {
try (Scope scope = span.makeCurrent()) {
writeRequest(call); writeRequest(call);
} finally {
span.end();
} }
} }

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import io.opentelemetry.api.trace.Span;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;
@ -25,13 +24,13 @@ import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.Tracer;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/** A call waiting for a value. */ /** A call waiting for a value. */
@ -74,7 +73,7 @@ class Call {
this.timeout = timeout; this.timeout = timeout;
this.priority = priority; this.priority = priority;
this.callback = callback; this.callback = callback;
this.span = Span.current(); this.span = Tracer.getCurrentSpan();
} }
/** /**

View File

@ -192,8 +192,8 @@
</dependency> </dependency>
<!-- tracing Dependencies --> <!-- tracing Dependencies -->
<dependency> <dependency>
<groupId>io.opentelemetry</groupId> <groupId>org.apache.htrace</groupId>
<artifactId>opentelemetry-api</artifactId> <artifactId>htrace-core4</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.trace;
import org.apache.hadoop.conf.Configuration;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class HBaseHTraceConfiguration extends HTraceConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(HBaseHTraceConfiguration.class);
public static final String KEY_PREFIX = "hbase.htrace.";
private Configuration conf;
private void handleDeprecation(String key) {
String oldKey = "hbase." + key;
String newKey = KEY_PREFIX + key;
String oldValue = conf.get(oldKey);
if (oldValue != null) {
LOG.warn("Warning: using deprecated configuration key " + oldKey +
". Please use " + newKey + " instead.");
String newValue = conf.get(newKey);
if (newValue == null) {
conf.set(newKey, oldValue);
} else {
LOG.warn("Conflicting values for " + newKey + " and " + oldKey +
". Using " + newValue);
}
}
}
public HBaseHTraceConfiguration(Configuration conf) {
this.conf = conf;
handleDeprecation("local-file-span-receiver.path");
handleDeprecation("local-file-span-receiver.capacity");
handleDeprecation("sampler.frequency");
handleDeprecation("sampler.fraction");
handleDeprecation("zipkin.collector-hostname");
handleDeprecation("zipkin.collector-port");
handleDeprecation("zipkin.num-threads");
handleDeprecation("zipkin.traced-service-hostname");
handleDeprecation("zipkin.traced-service-port");
}
@Override
public String get(String key) {
return conf.get(KEY_PREFIX + key);
}
@Override
public String get(String key, String defaultValue) {
return conf.get(KEY_PREFIX + key,defaultValue);
}
@Override
public boolean getBoolean(String key, boolean defaultValue) {
return conf.getBoolean(KEY_PREFIX + key, defaultValue);
}
}

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.trace;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.htrace.core.SpanReceiver;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides functions for reading the names of SpanReceivers from
* hbase-site.xml, adding those SpanReceivers to the Tracer, and closing those
* SpanReceivers when appropriate.
*/
@InterfaceAudience.Private
public class SpanReceiverHost {
public static final String SPAN_RECEIVERS_CONF_KEY = "hbase.trace.spanreceiver.classes";
private static final Logger LOG = LoggerFactory.getLogger(SpanReceiverHost.class);
private Collection<SpanReceiver> receivers;
private Configuration conf;
private boolean closed = false;
private enum SingletonHolder {
INSTANCE;
final transient Object lock = new Object();
transient SpanReceiverHost host = null;
}
public static SpanReceiverHost getInstance(Configuration conf) {
synchronized (SingletonHolder.INSTANCE.lock) {
if (SingletonHolder.INSTANCE.host != null) {
return SingletonHolder.INSTANCE.host;
}
SpanReceiverHost host = new SpanReceiverHost(conf);
host.loadSpanReceivers();
SingletonHolder.INSTANCE.host = host;
return SingletonHolder.INSTANCE.host;
}
}
public static Configuration getConfiguration(){
synchronized (SingletonHolder.INSTANCE.lock) {
if (SingletonHolder.INSTANCE.host == null || SingletonHolder.INSTANCE.host.conf == null) {
return null;
}
return SingletonHolder.INSTANCE.host.conf;
}
}
SpanReceiverHost(Configuration conf) {
receivers = new HashSet<>();
this.conf = conf;
}
/**
* Reads the names of classes specified in the {@code hbase.trace.spanreceiver.classes} property
* and instantiates and registers them with the Tracer.
*/
public void loadSpanReceivers() {
String[] receiverNames = conf.getStrings(SPAN_RECEIVERS_CONF_KEY);
if (receiverNames == null || receiverNames.length == 0) {
return;
}
SpanReceiver.Builder builder = new SpanReceiver.Builder(new HBaseHTraceConfiguration(conf));
for (String className : receiverNames) {
className = className.trim();
SpanReceiver receiver = builder.className(className).build();
if (receiver != null) {
receivers.add(receiver);
LOG.info("SpanReceiver {} was loaded successfully.", className);
}
}
for (SpanReceiver rcvr : receivers) {
TraceUtil.addReceiver(rcvr);
}
}
/**
* Calls close() on all SpanReceivers created by this SpanReceiverHost.
*/
public synchronized void closeReceivers() {
if (closed) {
return;
}
closed = true;
for (SpanReceiver rcvr : receivers) {
try {
rcvr.close();
} catch (IOException e) {
LOG.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e);
}
}
}
}

View File

@ -17,19 +17,112 @@
*/ */
package org.apache.hadoop.hbase.trace; package org.apache.hadoop.hbase.trace;
import io.opentelemetry.api.OpenTelemetry; import org.apache.hadoop.conf.Configuration;
import io.opentelemetry.api.trace.Tracer; import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.Sampler;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanReceiver;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/**
* This wrapper class provides functions for accessing htrace 4+ functionality in a simplified way.
*/
@InterfaceAudience.Private @InterfaceAudience.Private
public final class TraceUtil { public final class TraceUtil {
private static HTraceConfiguration conf;
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.hbase"; private static Tracer tracer;
private TraceUtil() { private TraceUtil() {
} }
public static Tracer getGlobalTracer() { public static void initTracer(Configuration c) {
return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME); if (c != null) {
conf = new HBaseHTraceConfiguration(c);
}
if (tracer == null && conf != null) {
tracer = new Tracer.Builder("Tracer").conf(conf).build();
}
}
/**
* Wrapper method to create new TraceScope with the given description
* @return TraceScope or null when not tracing
*/
public static TraceScope createTrace(String description) {
return (tracer == null) ? null : tracer.newScope(description);
}
/**
* Wrapper method to create new child TraceScope with the given description
* and parent scope's spanId
* @param span parent span
* @return TraceScope or null when not tracing
*/
public static TraceScope createTrace(String description, Span span) {
if (span == null) {
return createTrace(description);
}
return (tracer == null) ? null : tracer.newScope(description, span.getSpanId());
}
/**
* Wrapper method to add new sampler to the default tracer
* @return true if added, false if it was already added
*/
public static boolean addSampler(Sampler sampler) {
if (sampler == null) {
return false;
}
return (tracer == null) ? false : tracer.addSampler(sampler);
}
/**
* Wrapper method to add key-value pair to TraceInfo of actual span
*/
public static void addKVAnnotation(String key, String value){
Span span = Tracer.getCurrentSpan();
if (span != null) {
span.addKVAnnotation(key, value);
}
}
/**
* Wrapper method to add receiver to actual tracerpool
* @return true if successfull, false if it was already added
*/
public static boolean addReceiver(SpanReceiver rcvr) {
return (tracer == null) ? false : tracer.getTracerPool().addReceiver(rcvr);
}
/**
* Wrapper method to remove receiver from actual tracerpool
* @return true if removed, false if doesn't exist
*/
public static boolean removeReceiver(SpanReceiver rcvr) {
return (tracer == null) ? false : tracer.getTracerPool().removeReceiver(rcvr);
}
/**
* Wrapper method to add timeline annotiation to current span with given message
*/
public static void addTimelineAnnotation(String msg) {
Span span = Tracer.getCurrentSpan();
if (span != null) {
span.addTimelineAnnotation(msg);
}
}
/**
* Wrap runnable with current tracer and description
* @param runnable to wrap
* @return wrapped runnable or original runnable when not tracing
*/
public static Runnable wrap(Runnable runnable, String description) {
return (tracer == null) ? runnable : tracer.wrap(runnable, description);
} }
} }

View File

@ -109,6 +109,10 @@
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core4</artifactId>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -29,11 +27,13 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import net.spy.memcached.CachedData; import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode; import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClient;
import net.spy.memcached.transcoders.Transcoder; import net.spy.memcached.transcoders.Transcoder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Addressing;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -128,12 +129,12 @@ public class MemcachedBlockCache implements BlockCache {
} }
@Override @Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
boolean updateCacheMetrics) { boolean repeat, boolean updateCacheMetrics) {
// Assume that nothing is the block cache // Assume that nothing is the block cache
HFileBlock result = null; HFileBlock result = null;
Span span = TraceUtil.getGlobalTracer().spanBuilder("MemcachedBlockCache.getBlock").startSpan();
try (Scope traceScope = span.makeCurrent()) { try (TraceScope traceScope = TraceUtil.createTrace("MemcachedBlockCache.getBlock")) {
result = client.get(cacheKey.toString(), tc); result = client.get(cacheKey.toString(), tc);
} catch (Exception e) { } catch (Exception e) {
// Catch a pretty broad set of exceptions to limit any changes in the memecache client // Catch a pretty broad set of exceptions to limit any changes in the memecache client
@ -145,7 +146,6 @@ public class MemcachedBlockCache implements BlockCache {
} }
result = null; result = null;
} finally { } finally {
span.end();
// Update stats if this request doesn't have it turned off 100% of the time // Update stats if this request doesn't have it turned off 100% of the time
if (updateCacheMetrics) { if (updateCacheMetrics) {
if (result == null) { if (result == null) {

View File

@ -247,8 +247,8 @@
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.opentelemetry</groupId> <groupId>org.apache.htrace</groupId>
<artifactId>opentelemetry-api</artifactId> <artifactId>htrace-core4</artifactId>
</dependency> </dependency>
<!-- Hadoop needs Netty 3.x at test scope for the minicluster --> <!-- Hadoop needs Netty 3.x at test scope for the minicluster -->
<dependency> <dependency>

View File

@ -78,6 +78,7 @@ public class IntegrationTestTableMapReduceUtil implements Configurable, Tool {
assertTrue(tmpjars.contains("netty")); assertTrue(tmpjars.contains("netty"));
assertTrue(tmpjars.contains("protobuf")); assertTrue(tmpjars.contains("protobuf"));
assertTrue(tmpjars.contains("guava")); assertTrue(tmpjars.contains("guava"));
assertTrue(tmpjars.contains("htrace"));
} }
@Override @Override

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.mttr;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeFalse; import static org.junit.Assume.assumeFalse;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -66,6 +64,9 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool; import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.htrace.core.AlwaysSampler;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.TraceScope;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -375,9 +376,12 @@ public class IntegrationTestMTTR {
* @param span Span. To be kept if the time taken was over 1 second * @param span Span. To be kept if the time taken was over 1 second
*/ */
public void addResult(long time, Span span) { public void addResult(long time, Span span) {
if (span == null) {
return;
}
stats.addValue(TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS)); stats.addValue(TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS));
if (TimeUnit.SECONDS.convert(time, TimeUnit.NANOSECONDS) >= 1) { if (TimeUnit.SECONDS.convert(time, TimeUnit.NANOSECONDS) >= 1) {
traces.add(span.getSpanContext().getTraceIdAsHexString()); traces.add(span.getTracerId());
} }
} }
@ -417,11 +421,15 @@ public class IntegrationTestMTTR {
final int maxIterations = 10; final int maxIterations = 10;
int numAfterDone = 0; int numAfterDone = 0;
int resetCount = 0; int resetCount = 0;
TraceUtil.addSampler(AlwaysSampler.INSTANCE);
// Keep trying until the rs is back up and we've gotten a put through // Keep trying until the rs is back up and we've gotten a put through
while (numAfterDone < maxIterations) { while (numAfterDone < maxIterations) {
long start = System.nanoTime(); long start = System.nanoTime();
Span span = TraceUtil.getGlobalTracer().spanBuilder(getSpanName()).startSpan(); Span span = null;
try (Scope scope = span.makeCurrent()) { try (TraceScope scope = TraceUtil.createTrace(getSpanName())) {
if (scope != null) {
span = scope.getSpan();
}
boolean actionResult = doAction(); boolean actionResult = doAction();
if (actionResult && future.isDone()) { if (actionResult && future.isDone()) {
numAfterDone++; numAfterDone++;
@ -452,6 +460,7 @@ public class IntegrationTestMTTR {
throw e; throw e;
} catch (RetriesExhaustedException e){ } catch (RetriesExhaustedException e){
throw e; throw e;
// Everything else is potentially recoverable on the application side. For instance, a CM // Everything else is potentially recoverable on the application side. For instance, a CM
// action kills the RS that hosted a scanner the client was using. Continued use of that // action kills the RS that hosted a scanner the client was using. Continued use of that
// scanner should be terminated, but a new scanner can be created and the read attempted // scanner should be terminated, but a new scanner can be created and the read attempted
@ -466,8 +475,6 @@ public class IntegrationTestMTTR {
LOG.info("Too many unexpected Exceptions. Aborting.", e); LOG.info("Too many unexpected Exceptions. Aborting.", e);
throw e; throw e;
} }
} finally {
span.end();
} }
result.addResult(System.nanoTime() - start, span); result.addResult(System.nanoTime() - start, span);
} }

View File

@ -18,19 +18,10 @@
package org.apache.hadoop.hbase.trace; package org.apache.hadoop.hbase.trace;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutator;
@ -40,21 +31,26 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.htrace.core.Sampler;
import org.apache.htrace.core.TraceScope;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@Category(IntegrationTests.class) @Category(IntegrationTests.class)
public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
private static final Logger LOG =
LoggerFactory.getLogger(IntegrationTestSendTraceRequests.class);
public static final String TABLE_ARG = "t"; public static final String TABLE_ARG = "t";
public static final String CF_ARG = "f"; public static final String CF_ARG = "f";
@ -65,6 +61,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
private IntegrationTestingUtility util; private IntegrationTestingUtility util;
private Random random = new Random(); private Random random = new Random();
private Admin admin; private Admin admin;
private SpanReceiverHost receiverHost;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create(); Configuration configuration = HBaseConfiguration.create();
@ -98,6 +95,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
public void internalDoWork() throws Exception { public void internalDoWork() throws Exception {
util = createUtil(); util = createUtil();
admin = util.getAdmin(); admin = util.getAdmin();
setupReceiver();
deleteTable(); deleteTable();
createTable(); createTable();
@ -110,53 +108,51 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
service.shutdown(); service.shutdown();
service.awaitTermination(100, TimeUnit.SECONDS); service.awaitTermination(100, TimeUnit.SECONDS);
Thread.sleep(90000); Thread.sleep(90000);
receiverHost.closeReceivers();
util.restoreCluster(); util.restoreCluster();
util = null; util = null;
} }
@SuppressWarnings("FutureReturnValueIgnored")
private void doScans(ExecutorService service, final LinkedBlockingQueue<Long> rks) { private void doScans(ExecutorService service, final LinkedBlockingQueue<Long> rks) {
for (int i = 0; i < 100; i++) {
Runnable runnable = new Runnable() {
private final LinkedBlockingQueue<Long> rowKeyQueue = rks;
@Override public void run() { for (int i = 0; i < 100; i++) {
ResultScanner rs = null; Runnable runnable = new Runnable() {
Span span = TraceUtil.getGlobalTracer().spanBuilder("Scan").startSpan(); private final LinkedBlockingQueue<Long> rowKeyQueue = rks;
try (Scope scope = span.makeCurrent()) { @Override
Table ht = util.getConnection().getTable(tableName); public void run() {
Scan s = new Scan(); ResultScanner rs = null;
s.withStartRow(Bytes.toBytes(rowKeyQueue.take())); TraceUtil.addSampler(Sampler.ALWAYS);
s.setBatch(7); try (TraceScope scope = TraceUtil.createTrace("Scan")){
rs = ht.getScanner(s); Table ht = util.getConnection().getTable(tableName);
// Something to keep the jvm from removing the loop. Scan s = new Scan();
long accum = 0; s.setStartRow(Bytes.toBytes(rowKeyQueue.take()));
s.setBatch(7);
rs = ht.getScanner(s);
// Something to keep the jvm from removing the loop.
long accum = 0;
for (int x = 0; x < 1000; x++) { for(int x = 0; x < 1000; x++) {
Result r = rs.next(); Result r = rs.next();
accum |= Bytes.toLong(r.getRow()); accum |= Bytes.toLong(r.getRow());
}
TraceUtil.addTimelineAnnotation("Accum result = " + accum);
ht.close();
ht = null;
} catch (IOException e) {
e.printStackTrace();
TraceUtil.addKVAnnotation("exception", e.getClass().getSimpleName());
} catch (Exception e) {
} finally {
if (rs != null) rs.close();
} }
span.addEvent("Accum result = " + accum);
ht.close();
ht = null;
} catch (IOException e) {
LOG.warn("Exception occurred while scanning table", e);
span.addEvent("exception",
Attributes.of(AttributeKey.stringKey("exception"), e.getClass().getSimpleName()));
} catch (Exception e) {
LOG.warn("Exception occurred while scanning table", e);
} finally {
span.end();
if (rs != null) {
rs.close();
}
} }
} };
}; service.submit(runnable);
service.submit(runnable); }
}
} }
private void doGets(ExecutorService service, final LinkedBlockingQueue<Long> rowKeys) private void doGets(ExecutorService service, final LinkedBlockingQueue<Long> rowKeys)
@ -177,9 +173,9 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
} }
long accum = 0; long accum = 0;
TraceUtil.addSampler(Sampler.ALWAYS);
for (int x = 0; x < 5; x++) { for (int x = 0; x < 5; x++) {
Span span = TraceUtil.getGlobalTracer().spanBuilder("gets").startSpan(); try (TraceScope scope = TraceUtil.createTrace("gets")) {
try (Scope scope = span.makeCurrent()) {
long rk = rowKeyQueue.take(); long rk = rowKeyQueue.take();
Result r1 = ht.get(new Get(Bytes.toBytes(rk))); Result r1 = ht.get(new Get(Bytes.toBytes(rk)));
if (r1 != null) { if (r1 != null) {
@ -189,12 +185,10 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
if (r2 != null) { if (r2 != null) {
accum |= Bytes.toLong(r2.getRow()); accum |= Bytes.toLong(r2.getRow());
} }
span.addEvent("Accum = " + accum); TraceUtil.addTimelineAnnotation("Accum = " + accum);
} catch (IOException|InterruptedException ie) { } catch (IOException|InterruptedException ie) {
// IGNORED // IGNORED
} finally {
span.end();
} }
} }
@ -205,22 +199,18 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
} }
private void createTable() throws IOException { private void createTable() throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("createTable").startSpan(); TraceUtil.addSampler(Sampler.ALWAYS);
try (Scope scope = span.makeCurrent()) { try (TraceScope scope = TraceUtil.createTrace("createTable")) {
util.createTable(tableName, familyName); util.createTable(tableName, familyName);
} finally {
span.end();
} }
} }
private void deleteTable() throws IOException { private void deleteTable() throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("deleteTable").startSpan(); TraceUtil.addSampler(Sampler.ALWAYS);
try (Scope scope = span.makeCurrent()) { try (TraceScope scope = TraceUtil.createTrace("deleteTable")) {
if (admin.tableExists(tableName)) { if (admin.tableExists(tableName)) {
util.deleteTable(tableName); util.deleteTable(tableName);
} }
} finally {
span.end();
} }
} }
@ -228,9 +218,9 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<>(25000); LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<>(25000);
BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName);
byte[] value = new byte[300]; byte[] value = new byte[300];
TraceUtil.addSampler(Sampler.ALWAYS);
for (int x = 0; x < 5000; x++) { for (int x = 0; x < 5000; x++) {
Span span = TraceUtil.getGlobalTracer().spanBuilder("insertData").startSpan(); try (TraceScope traceScope = TraceUtil.createTrace("insertData")) {
try (Scope scope = span.makeCurrent()) {
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
long rk = random.nextLong(); long rk = random.nextLong();
rowKeys.add(rk); rowKeys.add(rk);
@ -244,8 +234,6 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
if ((x % 1000) == 0) { if ((x % 1000) == 0) {
admin.flush(tableName); admin.flush(tableName);
} }
} finally {
span.end();
} }
} }
admin.flush(tableName); admin.flush(tableName);
@ -267,4 +255,11 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
} }
return this.util; return this.util;
} }
private void setupReceiver() {
Configuration conf = new Configuration(util.getConfiguration());
conf.setBoolean("hbase.zipkin.is-in-client-mode", true);
this.receiverHost = SpanReceiverHost.getInstance(conf);
}
} }

View File

@ -155,8 +155,8 @@
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.opentelemetry</groupId> <groupId>org.apache.htrace</groupId>
<artifactId>opentelemetry-api</artifactId> <artifactId>htrace-core4</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>

View File

@ -831,6 +831,7 @@ public class TableMapReduceUtil {
org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty
org.apache.zookeeper.ZooKeeper.class, // zookeeper org.apache.zookeeper.ZooKeeper.class, // zookeeper
com.google.protobuf.Message.class, // protobuf com.google.protobuf.Message.class, // protobuf
org.apache.htrace.core.Tracer.class, // htrace
com.codahale.metrics.MetricRegistry.class, // metrics-core com.codahale.metrics.MetricRegistry.class, // metrics-core
org.apache.commons.lang3.ArrayUtils.class); // commons-lang org.apache.commons.lang3.ArrayUtils.class); // commons-lang
} }

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hbase;
import com.codahale.metrics.Histogram; import com.codahale.metrics.Histogram;
import com.codahale.metrics.UniformReservoir; import com.codahale.metrics.UniformReservoir;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
@ -86,6 +84,8 @@ import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.ByteArrayHashKey; import org.apache.hadoop.hbase.util.ByteArrayHashKey;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -104,6 +104,9 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.htrace.core.ProbabilitySampler;
import org.apache.htrace.core.Sampler;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -694,10 +697,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
int totalRows = DEFAULT_ROWS_PER_GB; int totalRows = DEFAULT_ROWS_PER_GB;
int measureAfter = 0; int measureAfter = 0;
float sampleRate = 1.0f; float sampleRate = 1.0f;
/**
* @deprecated Useless after switching to OpenTelemetry
*/
@Deprecated
double traceRate = 0.0; double traceRate = 0.0;
String tableName = TABLE_NAME; String tableName = TABLE_NAME;
boolean flushCommits = true; boolean flushCommits = true;
@ -1148,6 +1147,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected final TestOptions opts; protected final TestOptions opts;
private final Status status; private final Status status;
private final Sampler traceSampler;
private final SpanReceiverHost receiverHost;
private String testName; private String testName;
private Histogram latencyHistogram; private Histogram latencyHistogram;
@ -1169,9 +1170,18 @@ public class PerformanceEvaluation extends Configured implements Tool {
*/ */
TestBase(final Configuration conf, final TestOptions options, final Status status) { TestBase(final Configuration conf, final TestOptions options, final Status status) {
this.conf = conf; this.conf = conf;
this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
this.opts = options; this.opts = options;
this.status = status; this.status = status;
this.testName = this.getClass().getSimpleName(); this.testName = this.getClass().getSimpleName();
if (options.traceRate >= 1.0) {
this.traceSampler = Sampler.ALWAYS;
} else if (options.traceRate > 0.0) {
conf.setDouble("hbase.sampler.fraction", options.traceRate);
this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
} else {
this.traceSampler = Sampler.NEVER;
}
everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
if (options.isValueZipf()) { if (options.isValueZipf()) {
this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
@ -1341,6 +1351,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
} }
} }
receiverHost.closeReceivers();
} }
abstract void onTakedown() throws IOException; abstract void onTakedown() throws IOException;
@ -1377,6 +1388,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testTimed() throws IOException, InterruptedException { void testTimed() throws IOException, InterruptedException {
int startRow = getStartRow(); int startRow = getStartRow();
int lastRow = getLastRow(); int lastRow = getLastRow();
TraceUtil.addSampler(traceSampler);
// Report on completion of 1/10th of total. // Report on completion of 1/10th of total.
for (int ii = 0; ii < opts.cycles; ii++) { for (int ii = 0; ii < opts.cycles; ii++) {
if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
@ -1384,11 +1396,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
if (i % everyN != 0) continue; if (i % everyN != 0) continue;
long startTime = System.nanoTime(); long startTime = System.nanoTime();
boolean requestSent = false; boolean requestSent = false;
Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); try (TraceScope scope = TraceUtil.createTrace("test row");){
try (Scope scope = span.makeCurrent()){
requestSent = testRow(i, startTime); requestSent = testRow(i, startTime);
} finally {
span.end();
} }
if ( (i - startRow) > opts.measureAfter) { if ( (i - startRow) > opts.measureAfter) {
// If multiget or multiput is enabled, say set to 10, testRow() returns immediately // If multiget or multiput is enabled, say set to 10, testRow() returns immediately

View File

@ -201,6 +201,10 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core4</artifactId>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>
<!-- Skip the tests in this module --> <!-- Skip the tests in this module -->

View File

@ -427,8 +427,8 @@
</dependency> </dependency>
<!-- tracing Dependencies --> <!-- tracing Dependencies -->
<dependency> <dependency>
<groupId>io.opentelemetry</groupId> <groupId>org.apache.htrace</groupId>
<artifactId>opentelemetry-api</artifactId> <artifactId>htrace-core4</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.lmax</groupId> <groupId>com.lmax</groupId>

View File

@ -18,14 +18,14 @@
*/ */
package org.apache.hadoop.hbase.executor; package org.apache.hadoop.hbase.executor;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -75,7 +75,7 @@ public abstract class EventHandler implements Runnable, Comparable<EventHandler>
* Default base class constructor. * Default base class constructor.
*/ */
public EventHandler(Server server, EventType eventType) { public EventHandler(Server server, EventType eventType) {
this.parent = Span.current(); this.parent = Tracer.getCurrentSpan();
this.server = server; this.server = server;
this.eventType = eventType; this.eventType = eventType;
seqid = seqids.incrementAndGet(); seqid = seqids.incrementAndGet();
@ -100,14 +100,10 @@ public abstract class EventHandler implements Runnable, Comparable<EventHandler>
@Override @Override
public void run() { public void run() {
Span span = TraceUtil.getGlobalTracer().spanBuilder(getClass().getSimpleName()) try (TraceScope scope = TraceUtil.createTrace(this.getClass().getSimpleName(), parent)) {
.setParent(Context.current().with(parent)).startSpan();
try (Scope scope = span.makeCurrent()) {
process(); process();
} catch (Throwable t) { } catch(Throwable t) {
handleException(t); handleException(t);
} finally {
span.end();
} }
} }

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.DataInput; import java.io.DataInput;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -50,6 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -1288,8 +1287,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
boolean useLock = false; boolean useLock = false;
IdLock.Entry lockEntry = null; IdLock.Entry lockEntry = null;
Span span = TraceUtil.getGlobalTracer().spanBuilder("HFileReaderImpl.readBlock").startSpan(); try (TraceScope traceScope = TraceUtil.createTrace("HFileReaderImpl.readBlock")) {
try (Scope traceScope = span.makeCurrent()) {
while (true) { while (true) {
// Check cache for block. If found return. // Check cache for block. If found return.
if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) { if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
@ -1304,7 +1302,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("From Cache " + cachedBlock); LOG.trace("From Cache " + cachedBlock);
} }
span.addEvent("blockCacheHit"); TraceUtil.addTimelineAnnotation("blockCacheHit");
assert cachedBlock.isUnpacked() : "Packed block leak."; assert cachedBlock.isUnpacked() : "Packed block leak.";
if (cachedBlock.getBlockType().isData()) { if (cachedBlock.getBlockType().isData()) {
if (updateCacheMetrics) { if (updateCacheMetrics) {
@ -1334,7 +1332,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
// Carry on, please load. // Carry on, please load.
} }
span.addEvent("blockCacheMiss"); TraceUtil.addTimelineAnnotation("blockCacheMiss");
// Load block from filesystem. // Load block from filesystem.
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType)); !isCompaction, shouldUseHeap(expectedBlockType));
@ -1364,7 +1362,6 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
if (lockEntry != null) { if (lockEntry != null) {
offsetLock.releaseLockEntry(lockEntry); offsetLock.releaseLockEntry(lockEntry);
} }
span.end();
} }
} }

View File

@ -17,24 +17,23 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.Optional; import java.util.Optional;
import org.apache.hadoop.hbase.CallDroppedException; import org.apache.hadoop.hbase.CallDroppedException;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
/** /**
@ -95,14 +94,6 @@ public class CallRunner {
this.rpcServer = null; this.rpcServer = null;
} }
private String getServiceName() {
return call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
}
private String getMethodName() {
return call.getMethod() != null ? call.getMethod().getName() : "";
}
public void run() { public void run() {
try { try {
if (call.disconnectSince() >= 0) { if (call.disconnectSince() >= 0) {
@ -127,16 +118,18 @@ public class CallRunner {
String error = null; String error = null;
Pair<Message, CellScanner> resultPair = null; Pair<Message, CellScanner> resultPair = null;
RpcServer.CurCall.set(call); RpcServer.CurCall.set(call);
String serviceName = getServiceName(); TraceScope traceScope = null;
String methodName = getMethodName(); try {
String traceString = serviceName + "." + methodName;
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
try (Scope traceScope = span.makeCurrent()) {
if (!this.rpcServer.isStarted()) { if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress(); InetSocketAddress address = rpcServer.getListenerAddress();
throw new ServerNotRunningYetException("Server " + throw new ServerNotRunningYetException("Server " +
(address != null ? address : "(channel closed)") + " is not running yet"); (address != null ? address : "(channel closed)") + " is not running yet");
} }
String serviceName =
call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
String traceString = serviceName + "." + methodName;
traceScope = TraceUtil.createTrace(traceString);
// make the call // make the call
resultPair = this.rpcServer.call(call, this.status); resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){ } catch (TimeoutIOException e){
@ -158,12 +151,14 @@ public class CallRunner {
throw (Error)e; throw (Error)e;
} }
} finally { } finally {
if (traceScope != null) {
traceScope.close();
}
RpcServer.CurCall.set(null); RpcServer.CurCall.set(null);
if (resultPair != null) { if (resultPair != null) {
this.rpcServer.addCallSize(call.getSize() * -1); this.rpcServer.addCallSize(call.getSize() * -1);
sucessful = true; sucessful = true;
} }
span.end();
} }
this.status.markComplete("To send response"); this.status.markComplete("To send response");
// return back the RPC request read BB we can do here. It is done by now. // return back the RPC request read BB we can do here. It is done by now.

View File

@ -193,6 +193,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -413,6 +414,7 @@ public class HMaster extends HRegionServer implements MasterServices {
*/ */
public HMaster(final Configuration conf) throws IOException { public HMaster(final Configuration conf) throws IOException {
super(conf); super(conf);
TraceUtil.initTracer(conf);
try { try {
if (conf.getBoolean(MAINTENANCE_MODE, false)) { if (conf.getBoolean(MAINTENANCE_MODE, false)) {
LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE); LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);

View File

@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.master;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -36,7 +39,6 @@ import org.apache.hadoop.hbase.util.ServerCommandLine;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -165,6 +167,8 @@ public class HMasterCommandLine extends ServerCommandLine {
private int startMaster() { private int startMaster() {
Configuration conf = getConf(); Configuration conf = getConf();
TraceUtil.initTracer(conf);
try { try {
// If 'local', defer to LocalHBaseCluster instance. Starts master // If 'local', defer to LocalHBaseCluster instance. Starts master
// and regionserver both in the one JVM. // and regionserver both in the one JVM.

View File

@ -21,8 +21,6 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import edu.umd.cs.findbugs.annotations.Nullable; import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -193,6 +191,7 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -6585,9 +6584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RowLockImpl result = null; RowLockImpl result = null;
boolean success = false; boolean success = false;
Span span = TraceUtil.getGlobalTracer().spanBuilder("HRegion.getRowLock").startSpan(); try (TraceScope scope = TraceUtil.createTrace("HRegion.getRowLock")) {
try (Scope scope = span.makeCurrent()) { TraceUtil.addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock"));
span.addEvent("Getting a " + (readLock ? "readLock" : "writeLock"));
// Keep trying until we have a lock or error out. // Keep trying until we have a lock or error out.
// TODO: do we need to add a time component here? // TODO: do we need to add a time component here?
while (result == null) { while (result == null) {
@ -6624,7 +6622,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) { if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
span.addEvent("Failed to get row lock"); TraceUtil.addTimelineAnnotation("Failed to get row lock");
String message = "Timed out waiting for lock for row: " + rowKey + " in region " String message = "Timed out waiting for lock for row: " + rowKey + " in region "
+ getRegionInfo().getEncodedName(); + getRegionInfo().getEncodedName();
if (reachDeadlineFirst) { if (reachDeadlineFirst) {
@ -6642,7 +6640,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
LOG.debug("Thread interrupted waiting for lock on row: {}, in region {}", rowKey, LOG.debug("Thread interrupted waiting for lock on row: {}, in region {}", rowKey,
getRegionInfo().getRegionNameAsString()); getRegionInfo().getRegionNameAsString());
} }
span.addEvent("Interrupted exception getting row lock"); TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock");
throw throwOnInterrupt(ie); throw throwOnInterrupt(ie);
} catch (Error error) { } catch (Error error) {
// The maximum lock count for read lock is 64K (hardcoded), when this maximum count // The maximum lock count for read lock is 64K (hardcoded), when this maximum count
@ -6651,14 +6649,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
LOG.warn("Error to get row lock for {}, in region {}, cause: {}", Bytes.toStringBinary(row), LOG.warn("Error to get row lock for {}, in region {}, cause: {}", Bytes.toStringBinary(row),
getRegionInfo().getRegionNameAsString(), error); getRegionInfo().getRegionNameAsString(), error);
IOException ioe = new IOException(error); IOException ioe = new IOException(error);
span.addEvent("Error getting row lock"); TraceUtil.addTimelineAnnotation("Error getting row lock");
throw ioe; throw ioe;
} finally { } finally {
// Clean up the counts just in case this was the thing keeping the context alive. // Clean up the counts just in case this was the thing keeping the context alive.
if (!success && rowLockContext != null) { if (!success && rowLockContext != null) {
rowLockContext.cleanUp(); rowLockContext.cleanUp();
} }
span.end();
} }
} }

View File

@ -159,6 +159,8 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -397,6 +399,7 @@ public class HRegionServer extends Thread implements
private MetricsRegionServer metricsRegionServer; private MetricsRegionServer metricsRegionServer;
MetricsRegionServerWrapperImpl metricsRegionServerImpl; MetricsRegionServerWrapperImpl metricsRegionServerImpl;
private SpanReceiverHost spanReceiverHost;
/** /**
* ChoreService used to schedule tasks that we want to run periodically * ChoreService used to schedule tasks that we want to run periodically
@ -593,6 +596,7 @@ public class HRegionServer extends Thread implements
*/ */
public HRegionServer(final Configuration conf) throws IOException { public HRegionServer(final Configuration conf) throws IOException {
super("RegionServer"); // thread name super("RegionServer"); // thread name
TraceUtil.initTracer(conf);
try { try {
this.startcode = EnvironmentEdgeManager.currentTime(); this.startcode = EnvironmentEdgeManager.currentTime();
this.conf = conf; this.conf = conf;
@ -664,6 +668,7 @@ public class HRegionServer extends Thread implements
(t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
initializeFileSystem(); initializeFileSystem();
spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
this.configurationManager = new ConfigurationManager(); this.configurationManager = new ConfigurationManager();
setupWindows(getConfiguration(), getConfigurationManager()); setupWindows(getConfiguration(), getConfigurationManager());
@ -2712,6 +2717,10 @@ public class HRegionServer extends Thread implements
if (this.cacheFlusher != null) { if (this.cacheFlusher != null) {
this.cacheFlusher.join(); this.cacheFlusher.join();
} }
if (this.spanReceiverHost != null) {
this.spanReceiverHost.closeReceivers();
}
if (this.walRoller != null) { if (this.walRoller != null) {
this.walRoller.close(); this.walRoller.close();
} }

View File

@ -18,13 +18,14 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.util.ServerCommandLine; import org.apache.hadoop.hbase.util.ServerCommandLine;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Class responsible for parsing the command line and starting the * Class responsible for parsing the command line and starting the
@ -50,6 +51,7 @@ public class HRegionServerCommandLine extends ServerCommandLine {
private int start() throws Exception { private int start() throws Exception {
Configuration conf = getConf(); Configuration conf = getConf();
TraceUtil.initTracer(conf);
try { try {
// If 'local', don't start a region server here. Defer to // If 'local', don't start a region server here. Defer to
// LocalHBaseCluster. It manages 'local' clusters. // LocalHBaseCluster. It manages 'local' clusters.

View File

@ -18,8 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList; import java.util.ArrayList;
@ -38,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -50,12 +49,12 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Thread that flushes cache on request * Thread that flushes cache on request
* *
@ -713,12 +712,10 @@ public class MemStoreFlusher implements FlushRequester {
* amount of memstore consumption. * amount of memstore consumption.
*/ */
public void reclaimMemStoreMemory() { public void reclaimMemStoreMemory() {
Span span = try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan();
try (Scope scope = span.makeCurrent()) {
FlushType flushType = isAboveHighWaterMark(); FlushType flushType = isAboveHighWaterMark();
if (flushType != FlushType.NORMAL) { if (flushType != FlushType.NORMAL) {
span.addEvent("Force Flush. We're above high water mark."); TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
long start = EnvironmentEdgeManager.currentTime(); long start = EnvironmentEdgeManager.currentTime();
long nextLogTimeMs = start; long nextLogTimeMs = start;
synchronized (this.blockSignal) { synchronized (this.blockSignal) {
@ -787,7 +784,6 @@ public class MemStoreFlusher implements FlushRequester {
if (flushType != FlushType.NORMAL) { if (flushType != FlushType.NORMAL) {
wakeupFlushThread(); wakeupFlushThread();
} }
span.end();
} }
} }
} }

View File

@ -25,8 +25,6 @@ import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.c
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -85,6 +83,7 @@ import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -785,12 +784,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* @throws IOException if there is a problem flushing or closing the underlying FS * @throws IOException if there is a problem flushing or closing the underlying FS
*/ */
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHFile.replaceWriter").startSpan(); try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
try (Scope scope = span.makeCurrent()) {
doReplaceWriter(oldPath, newPath, nextWriter); doReplaceWriter(oldPath, newPath, nextWriter);
return newPath; return newPath;
} finally {
span.end();
} }
} }
@ -838,8 +834,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
LOG.debug("WAL closed. Skipping rolling of writer"); LOG.debug("WAL closed. Skipping rolling of writer");
return regionsToFlush; return regionsToFlush;
} }
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.rollWriter").startSpan(); try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
try (Scope scope = span.makeCurrent()) {
Path oldPath = getOldPath(); Path oldPath = getOldPath();
Path newPath = getNewPath(); Path newPath = getNewPath();
// Any exception from here on is catastrophic, non-recoverable so we currently abort. // Any exception from here on is catastrophic, non-recoverable so we currently abort.
@ -866,8 +861,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
throw new IOException( throw new IOException(
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
exception); exception);
} finally {
span.end();
} }
return regionsToFlush; return regionsToFlush;
} finally { } finally {
@ -1059,7 +1052,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
.append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)) .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
.append(" ms, current pipeline: ") .append(" ms, current pipeline: ")
.append(Arrays.toString(getPipeline())).toString(); .append(Arrays.toString(getPipeline())).toString();
Span.current().addEvent(msg); TraceUtil.addTimelineAnnotation(msg);
LOG.info(msg); LOG.info(msg);
// A single sync took too long. // A single sync took too long.
// Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
@ -1095,14 +1088,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
long txid = txidHolder.longValue(); long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
Span span = TraceUtil.getGlobalTracer().spanBuilder(implClassName + ".append").startSpan(); try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
try (Scope scope = span.makeCurrent()) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we); entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry); ringBuffer.get(txid).load(entry);
} finally { } finally {
ringBuffer.publish(txid); ringBuffer.publish(txid);
span.end();
} }
return txid; return txid;
} }

View File

@ -20,11 +20,10 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer; import com.lmax.disruptor.Sequencer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -45,6 +44,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -58,11 +58,12 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@ -400,7 +401,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} }
private void addTimeAnnotation(SyncFuture future, String annotation) { private void addTimeAnnotation(SyncFuture future, String annotation) {
Span.current().addEvent(annotation); TraceUtil.addTimelineAnnotation(annotation);
// TODO handle htrace API change, see HBASE-18895 // TODO handle htrace API change, see HBASE-18895
// future.setSpan(scope.getSpan()); // future.setSpan(scope.getSpan());
} }
@ -623,8 +624,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override @Override
public void sync(boolean forceSync) throws IOException { public void sync(boolean forceSync) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan(); try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
try (Scope scope = span.makeCurrent()) {
long txid = waitingConsumePayloads.next(); long txid = waitingConsumePayloads.next();
SyncFuture future; SyncFuture future;
try { try {
@ -638,8 +638,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
consumeExecutor.execute(consumer); consumeExecutor.execute(consumer);
} }
blockOnSync(future); blockOnSync(future);
} finally {
span.end();
} }
} }
@ -648,8 +646,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (highestSyncedTxid.get() >= txid) { if (highestSyncedTxid.get() >= txid) {
return; return;
} }
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan(); try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
try (Scope scope = span.makeCurrent()) {
// here we do not use ring buffer sequence as txid // here we do not use ring buffer sequence as txid
long sequence = waitingConsumePayloads.next(); long sequence = waitingConsumePayloads.next();
SyncFuture future; SyncFuture future;
@ -664,8 +661,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
consumeExecutor.execute(consumer); consumeExecutor.execute(consumer);
} }
blockOnSync(future); blockOnSync(future);
} finally {
span.end();
} }
} }

View File

@ -29,8 +29,6 @@ import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays; import java.util.Arrays;
@ -61,6 +59,7 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.htrace.core.TraceScope;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -365,7 +364,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// use assert to make sure no change breaks the logic that // use assert to make sure no change breaks the logic that
// sequence and zigzagLatch will be set together // sequence and zigzagLatch will be set together
assert sequence > 0L : "Failed to get sequence from ring buffer"; assert sequence > 0L : "Failed to get sequence from ring buffer";
Span.current().addEvent("awaiting safepoint"); TraceUtil.addTimelineAnnotation("awaiting safepoint");
syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence, false)); syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence, false));
} }
} catch (FailedSyncBeforeLogCloseException e) { } catch (FailedSyncBeforeLogCloseException e) {
@ -437,11 +436,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
} }
private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException { private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException {
Span span = Span.current();
try { try {
span.addEvent("closing writer"); TraceUtil.addTimelineAnnotation("closing writer");
writer.close(); writer.close();
span.addEvent("writer closed"); TraceUtil.addTimelineAnnotation("writer closed");
} catch (IOException ioe) { } catch (IOException ioe) {
int errors = closeErrorCount.incrementAndGet(); int errors = closeErrorCount.incrementAndGet();
boolean hasUnflushedEntries = isUnflushedEntries(); boolean hasUnflushedEntries = isUnflushedEntries();
@ -651,10 +649,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
long start = System.nanoTime(); long start = System.nanoTime();
Throwable lastException = null; Throwable lastException = null;
try { try {
Span.current().addEvent("syncing writer"); TraceUtil.addTimelineAnnotation("syncing writer");
long unSyncedFlushSeq = highestUnsyncedTxid; long unSyncedFlushSeq = highestUnsyncedTxid;
writer.sync(sf.isForceSync()); writer.sync(sf.isForceSync());
Span.current().addEvent("writer synced"); TraceUtil.addTimelineAnnotation("writer synced");
if (unSyncedFlushSeq > currentSequence) { if (unSyncedFlushSeq > currentSequence) {
currentSequence = unSyncedFlushSeq; currentSequence = unSyncedFlushSeq;
} }
@ -793,7 +791,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
} }
// Sync all known transactions // Sync all known transactions
private void publishSyncThenBlockOnCompletion(Scope scope, boolean forceSync) throws IOException { private void publishSyncThenBlockOnCompletion(TraceScope scope, boolean forceSync) throws IOException {
SyncFuture syncFuture = publishSyncOnRingBuffer(forceSync); SyncFuture syncFuture = publishSyncOnRingBuffer(forceSync);
blockOnSync(syncFuture); blockOnSync(syncFuture);
} }
@ -825,8 +823,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
@Override @Override
public void sync(boolean forceSync) throws IOException { public void sync(boolean forceSync) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.sync").startSpan(); try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) {
try (Scope scope = span.makeCurrent()) {
publishSyncThenBlockOnCompletion(scope, forceSync); publishSyncThenBlockOnCompletion(scope, forceSync);
} }
} }
@ -842,8 +839,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Already sync'd. // Already sync'd.
return; return;
} }
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.sync").startSpan(); try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) {
try (Scope scope = span.makeCurrent()) {
publishSyncThenBlockOnCompletion(scope, forceSync); publishSyncThenBlockOnCompletion(scope, forceSync);
} }
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import edu.umd.cs.findbugs.annotations.Nullable; import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -50,8 +51,8 @@ import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.function.BooleanSupplier;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -118,10 +119,12 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
@ -144,12 +147,11 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.ZooKeeper.States;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/** /**
* Facility for testing HBase. Replacement for * Facility for testing HBase. Replacement for
@ -661,6 +663,8 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(), Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
"ERROR"); "ERROR");
TraceUtil.initTracer(conf);
this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true, this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
true, null, racks, hosts, null); true, null, racks, hosts, null);
@ -1168,6 +1172,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
Log4jUtils.setLogLevel(org.apache.hadoop.hbase.ScheduledChore.class.getName(), "INFO"); Log4jUtils.setLogLevel(org.apache.hadoop.hbase.ScheduledChore.class.getName(), "INFO");
Configuration c = new Configuration(this.conf); Configuration c = new Configuration(this.conf);
TraceUtil.initTracer(c);
this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(), this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
option.getMasterClass(), option.getRsClass()); option.getMasterClass(), option.getRsClass());

View File

@ -165,8 +165,8 @@ public class TestExecutorService {
private final AtomicBoolean lock; private final AtomicBoolean lock;
private AtomicInteger counter; private AtomicInteger counter;
public TestEventHandler(Server server, EventType eventType, AtomicBoolean lock, public TestEventHandler(Server server, EventType eventType,
AtomicInteger counter) { AtomicBoolean lock, AtomicInteger counter) {
super(server, eventType); super(server, eventType);
this.lock = lock; this.lock = lock;
this.counter = counter; this.counter = counter;

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.trace;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.htrace.core.POJOSpanReceiver;
import org.apache.htrace.core.Sampler;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.TraceScope;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@Ignore // We don't support htrace in hbase-2.0.0 and this flakey is a little flakey.
@Category({MiscTests.class, MediumTests.class})
public class TestHTraceHooks {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHTraceHooks.class);
private static final byte[] FAMILY_BYTES = "family".getBytes();
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static POJOSpanReceiver rcvr;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void before() throws Exception {
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numMasters(2).numRegionServers(3).numDataNodes(3).build();
TEST_UTIL.startMiniCluster(option);
rcvr = new POJOSpanReceiver(new HBaseHTraceConfiguration(TEST_UTIL.getConfiguration()));
TraceUtil.addReceiver(rcvr);
TraceUtil.addSampler(new Sampler() {
@Override
public boolean next() {
return true;
}
});
}
@AfterClass
public static void after() throws Exception {
TEST_UTIL.shutdownMiniCluster();
TraceUtil.removeReceiver(rcvr);
rcvr = null;
}
@Test
public void testTraceCreateTable() throws Exception {
Table table;
Span createTableSpan;
try (TraceScope scope = TraceUtil.createTrace("creating table")) {
createTableSpan = scope.getSpan();
table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY_BYTES);
}
// Some table creation is async. Need to make sure that everything is full in before
// checking to see if the spans are there.
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return (rcvr == null) ? true : rcvr.getSpans().size() >= 5;
}
});
Collection<Span> spans = Sets.newHashSet(rcvr.getSpans());
List<Span> roots = new LinkedList<>();
TraceTree traceTree = new TraceTree(spans);
roots.addAll(traceTree.getSpansByParent().find(createTableSpan.getSpanId()));
// Roots was made 3 in hbase2. It used to be 1. We changed it back to 1 on upgrade to
// htrace-4.2 just to get the test to pass (traces are not wholesome in hbase2; TODO).
assertEquals(1, roots.size());
assertEquals("creating table", createTableSpan.getDescription());
if (spans != null) {
assertTrue(spans.size() > 5);
}
Put put = new Put("row".getBytes());
put.addColumn(FAMILY_BYTES, "col".getBytes(), "value".getBytes());
Span putSpan;
try (TraceScope scope = TraceUtil.createTrace("doing put")) {
putSpan = scope.getSpan();
table.put(put);
}
spans = rcvr.getSpans();
traceTree = new TraceTree(spans);
roots.clear();
roots.addAll(traceTree.getSpansByParent().find(putSpan.getSpanId()));
assertEquals(1, roots.size());
}
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.trace;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanId;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;
/**
* Used to create the graph formed by spans.
*/
public class TraceTree {
public static class SpansByParent {
private static Comparator<Span> COMPARATOR =
new Comparator<Span>() {
@Override
public int compare(Span a, Span b) {
return a.getSpanId().compareTo(b.getSpanId());
}
};
private final TreeSet<Span> treeSet;
private final HashMap<SpanId, LinkedList<Span>> parentToSpans;
SpansByParent(Collection<Span> spans) {
TreeSet<Span> treeSet = new TreeSet<Span>(COMPARATOR);
parentToSpans = new HashMap<SpanId, LinkedList<Span>>();
for (Span span : spans) {
treeSet.add(span);
for (SpanId parent : span.getParents()) {
LinkedList<Span> list = parentToSpans.get(parent);
if (list == null) {
list = new LinkedList<Span>();
parentToSpans.put(parent, list);
}
list.add(span);
}
if (span.getParents().length == 0) {
LinkedList<Span> list = parentToSpans.get(SpanId.INVALID);
if (list == null) {
list = new LinkedList<Span>();
parentToSpans.put(SpanId.INVALID, list);
}
list.add(span);
}
}
this.treeSet = treeSet;
}
public List<Span> find(SpanId parentId) {
LinkedList<Span> spans = parentToSpans.get(parentId);
if (spans == null) {
return new LinkedList<Span>();
}
return spans;
}
public Iterator<Span> iterator() {
return Collections.unmodifiableSortedSet(treeSet).iterator();
}
}
public static class SpansByProcessId {
private static Comparator<Span> COMPARATOR =
new Comparator<Span>() {
@Override
public int compare(Span a, Span b) {
return a.getSpanId().compareTo(b.getSpanId());
}
};
private final TreeSet<Span> treeSet;
SpansByProcessId(Collection<Span> spans) {
TreeSet<Span> treeSet = new TreeSet<Span>(COMPARATOR);
for (Span span : spans) {
treeSet.add(span);
}
this.treeSet = treeSet;
}
public Iterator<Span> iterator() {
return Collections.unmodifiableSortedSet(treeSet).iterator();
}
}
private final SpansByParent spansByParent;
private final SpansByProcessId spansByProcessId;
/**
* Create a new TraceTree
*
* @param spans The collection of spans to use to create this TraceTree. Should
* have at least one root span.
*/
public TraceTree(Collection<Span> spans) {
if (spans == null) {
spans = Collections.emptySet();
}
this.spansByParent = new SpansByParent(spans);
this.spansByProcessId = new SpansByProcessId(spans);
}
public SpansByParent getSpansByParent() {
return spansByParent;
}
public SpansByProcessId getSpansByProcessId() {
return spansByProcessId;
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
String prefix = "";
for (Iterator<Span> iter = spansByParent.iterator(); iter.hasNext();) {
Span span = iter.next();
bld.append(prefix).append(span.toString());
prefix = "\n";
}
return bld.toString();
}
}

View File

@ -25,8 +25,6 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -60,6 +58,8 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -68,6 +68,10 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.htrace.core.ProbabilitySampler;
import org.apache.htrace.core.Sampler;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -125,10 +129,12 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
private final boolean noSync; private final boolean noSync;
private final HRegion region; private final HRegion region;
private final int syncInterval; private final int syncInterval;
private final Sampler loopSampler;
private final NavigableMap<byte[], Integer> scopes; private final NavigableMap<byte[], Integer> scopes;
WALPutBenchmark(final HRegion region, final TableDescriptor htd, WALPutBenchmark(final HRegion region, final TableDescriptor htd,
final long numIterations, final boolean noSync, final int syncInterval) { final long numIterations, final boolean noSync, final int syncInterval,
final double traceFreq) {
this.numIterations = numIterations; this.numIterations = numIterations;
this.noSync = noSync; this.noSync = noSync;
this.syncInterval = syncInterval; this.syncInterval = syncInterval;
@ -138,6 +144,24 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
for(byte[] fam : htd.getColumnFamilyNames()) { for(byte[] fam : htd.getColumnFamilyNames()) {
scopes.put(fam, 0); scopes.put(fam, 0);
} }
String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
if (spanReceivers == null || spanReceivers.isEmpty()) {
loopSampler = Sampler.NEVER;
} else {
if (traceFreq <= 0.0) {
LOG.warn("Tracing enabled but traceFreq=0.");
loopSampler = Sampler.NEVER;
} else if (traceFreq >= 1.0) {
loopSampler = Sampler.ALWAYS;
if (numIterations > 1000) {
LOG.warn("Full tracing of all iterations will produce a lot of data. Be sure your"
+ " SpanReceiver can keep up.");
}
} else {
getConf().setDouble("hbase.sampler.fraction", traceFreq);
loopSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(getConf()));
}
}
} }
@Override @Override
@ -146,14 +170,13 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
byte[] value = new byte[valueSize]; byte[] value = new byte[valueSize];
Random rand = new Random(Thread.currentThread().getId()); Random rand = new Random(Thread.currentThread().getId());
WAL wal = region.getWAL(); WAL wal = region.getWAL();
Span threadSpan = TraceUtil.getGlobalTracer()
.spanBuilder("WALPerfEval." + Thread.currentThread().getName()).startSpan(); try (TraceScope threadScope = TraceUtil.createTrace("WALPerfEval." + Thread.currentThread().getName())) {
try (Scope threadScope = threadSpan.makeCurrent()) {
int lastSync = 0; int lastSync = 0;
TraceUtil.addSampler(loopSampler);
for (int i = 0; i < numIterations; ++i) { for (int i = 0; i < numIterations; ++i) {
assert Span.current() == threadSpan : "Span leak detected."; assert Tracer.getCurrentSpan() == threadScope.getSpan() : "Span leak detected.";
Span loopSpan = TraceUtil.getGlobalTracer().spanBuilder("runLoopIter" + i).startSpan(); try (TraceScope loopScope = TraceUtil.createTrace("runLoopIter" + i)) {
try (Scope loopScope = loopSpan.makeCurrent()) {
long now = System.nanoTime(); long now = System.nanoTime();
Put put = setupPut(rand, key, value, numFamilies); Put put = setupPut(rand, key, value, numFamilies);
WALEdit walEdit = new WALEdit(); WALEdit walEdit = new WALEdit();
@ -169,14 +192,10 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
} }
} }
latencyHistogram.update(System.nanoTime() - now); latencyHistogram.update(System.nanoTime() - now);
} finally {
loopSpan.end();
} }
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error(getClass().getSimpleName() + " Thread failed", e); LOG.error(getClass().getSimpleName() + " Thread failed", e);
} finally {
threadSpan.end();
} }
} }
} }
@ -197,6 +216,9 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
boolean compress = false; boolean compress = false;
String cipher = null; String cipher = null;
int numRegions = 1; int numRegions = 1;
String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
double traceFreq = 1.0;
// Process command line args // Process command line args
for (int i = 0; i < args.length; i++) { for (int i = 0; i < args.length; i++) {
String cmd = args[i]; String cmd = args[i];
@ -236,8 +258,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
} else if (cmd.equals("-regions")) { } else if (cmd.equals("-regions")) {
numRegions = Integer.parseInt(args[++i]); numRegions = Integer.parseInt(args[++i]);
} else if (cmd.equals("-traceFreq")) { } else if (cmd.equals("-traceFreq")) {
// keep it here for compatible traceFreq = Double.parseDouble(args[++i]);
System.err.println("-traceFreq is not supported any more");
} else if (cmd.equals("-h")) { } else if (cmd.equals("-h")) {
printUsageAndExit(); printUsageAndExit();
} else if (cmd.equals("--help")) { } else if (cmd.equals("--help")) {
@ -286,8 +307,13 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf())); CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf()));
FileSystem fs = FileSystem.get(getConf()); FileSystem fs = FileSystem.get(getConf());
LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir); LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir);
Span span = TraceUtil.getGlobalTracer().spanBuilder("WALPerfEval").startSpan();
try (Scope scope = span.makeCurrent()){ SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null;
final Sampler sampler = trace ? Sampler.ALWAYS : Sampler.NEVER;
TraceUtil.addSampler(sampler);
TraceScope scope = TraceUtil.createTrace("WALPerfEval");
try {
rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
cleanRegionRootDir(fs, rootRegionDir); cleanRegionRootDir(fs, rootRegionDir);
CommonFSUtils.setRootDir(getConf(), rootRegionDir); CommonFSUtils.setRootDir(getConf(), rootRegionDir);
@ -304,8 +330,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
// a table per desired region means we can avoid carving up the key space // a table per desired region means we can avoid carving up the key space
final TableDescriptor htd = createHTableDescriptor(i, numFamilies); final TableDescriptor htd = createHTableDescriptor(i, numFamilies);
regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller); regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller);
benchmarks[i] = benchmarks[i] = TraceUtil.wrap(new WALPutBenchmark(regions[i], htd, numIterations, noSync,
new WALPutBenchmark(regions[i], htd, numIterations, noSync, syncInterval); syncInterval, traceFreq), "");
} }
ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics). ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).
outputTo(System.out).convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build(); outputTo(System.out).convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build();
@ -354,14 +380,19 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
if (cleanup) cleanRegionRootDir(fs, rootRegionDir); if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
} }
} finally { } finally {
span.end();
// We may be called inside a test that wants to keep on using the fs. // We may be called inside a test that wants to keep on using the fs.
if (!noclosefs) { if (!noclosefs) {
fs.close(); fs.close();
} }
if (scope != null) {
scope.close();
}
if (receiverHost != null) {
receiverHost.closeReceivers();
}
} }
return 0; return(0);
} }
private static TableDescriptor createHTableDescriptor(final int regionNum, private static TableDescriptor createHTableDescriptor(final int regionNum,

View File

@ -79,7 +79,6 @@
<exclude>log4j:*</exclude> <exclude>log4j:*</exclude>
<exclude>commons-logging:*</exclude> <exclude>commons-logging:*</exclude>
<exclude>org.javassist:*</exclude> <exclude>org.javassist:*</exclude>
<exclude>io.opentelemetry:*</exclude>
</excludes> </excludes>
</artifactSet> </artifactSet>
</configuration> </configuration>

View File

@ -236,7 +236,6 @@
<exclude>log4j:*</exclude> <exclude>log4j:*</exclude>
<exclude>commons-logging:*</exclude> <exclude>commons-logging:*</exclude>
<exclude>org.javassist:*</exclude> <exclude>org.javassist:*</exclude>
<exclude>io.opentelemetry:*</exclude>
</excludes> </excludes>
</artifactSet> </artifactSet>
</configuration> </configuration>

View File

@ -159,7 +159,6 @@
<exclude>log4j:*</exclude> <exclude>log4j:*</exclude>
<exclude>commons-logging:*</exclude> <exclude>commons-logging:*</exclude>
<exclude>org.javassist:*</exclude> <exclude>org.javassist:*</exclude>
<exclude>io.opentelemetry:*</exclude>
</excludes> </excludes>
</artifactSet> </artifactSet>
<relocations> <relocations>

View File

@ -17,17 +17,16 @@
# limitations under the License. # limitations under the License.
# #
# Disable tracing for now as HTrace does not work any more java_import org.apache.hadoop.hbase.trace.SpanReceiverHost
# java_import org.apache.hadoop.hbase.trace.SpanReceiverHost
module Shell module Shell
module Commands module Commands
class Trace < Command class Trace < Command
# @@conf = org.apache.htrace.core.HTraceConfiguration.fromKeyValuePairs( @@conf = org.apache.htrace.core.HTraceConfiguration.fromKeyValuePairs(
# 'sampler.classes', 'org.apache.htrace.core.AlwaysSampler' 'sampler.classes', 'org.apache.htrace.core.AlwaysSampler'
# ) )
# @@tracer = org.apache.htrace.core.Tracer::Builder.new('HBaseShell').conf(@@conf).build() @@tracer = org.apache.htrace.core.Tracer::Builder.new('HBaseShell').conf(@@conf).build()
# @@tracescope = nil @@tracescope = nil
def help def help
<<-EOF <<-EOF
@ -58,23 +57,23 @@ EOF
end end
def trace(startstop, spanname) def trace(startstop, spanname)
# @@receiver ||= SpanReceiverHost.getInstance(@shell.hbase.configuration) @@receiver ||= SpanReceiverHost.getInstance(@shell.hbase.configuration)
# if startstop == 'start' if startstop == 'start'
# unless tracing? unless tracing?
# @@tracescope = @@tracer.newScope(spanname) @@tracescope = @@tracer.newScope(spanname)
# end end
# elsif startstop == 'stop' elsif startstop == 'stop'
# if tracing? if tracing?
# @@tracescope.close @@tracescope.close
# @@tracescope = nil @@tracescope = nil
# end end
# end end
# tracing? tracing?
end end
# def tracing? def tracing?
# @@tracescope != nil @@tracescope != nil
# end end
end end
end end
end end

View File

@ -148,10 +148,6 @@
<groupId>org.apache.zookeeper</groupId> <groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId> <artifactId>zookeeper</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<!-- Test dependencies --> <!-- Test dependencies -->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@ -18,18 +18,18 @@
*/ */
package org.apache.hadoop.hbase.zookeeper; package org.apache.hadoop.hbase.zookeeper;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
@ -164,8 +164,7 @@ public class RecoverableZooKeeper {
* exist. * exist.
*/ */
public void delete(String path, int version) throws InterruptedException, KeeperException { public void delete(String path, int version) throws InterruptedException, KeeperException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.delete").startSpan(); try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) {
try (Scope scope = span.makeCurrent()) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
boolean isRetry = false; // False for first attempt, true for all retries. boolean isRetry = false; // False for first attempt, true for all retries.
while (true) { while (true) {
@ -197,8 +196,6 @@ public class RecoverableZooKeeper {
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
isRetry = true; isRetry = true;
} }
} finally {
span.end();
} }
} }
@ -207,8 +204,7 @@ public class RecoverableZooKeeper {
* @return A Stat instance * @return A Stat instance
*/ */
public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan(); try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
try (Scope scope = span.makeCurrent()) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
while (true) { while (true) {
try { try {
@ -229,8 +225,6 @@ public class RecoverableZooKeeper {
} }
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
} }
} finally {
span.end();
} }
} }
@ -239,9 +233,7 @@ public class RecoverableZooKeeper {
* @return A Stat instance * @return A Stat instance
*/ */
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
Span span = try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan();
try (Scope scope = span.makeCurrent()) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
while (true) { while (true) {
try { try {
@ -263,8 +255,6 @@ public class RecoverableZooKeeper {
} }
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
} }
} finally {
span.end();
} }
} }
@ -283,9 +273,7 @@ public class RecoverableZooKeeper {
*/ */
public List<String> getChildren(String path, Watcher watcher) public List<String> getChildren(String path, Watcher watcher)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
Span span = try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan();
try (Scope scope = span.makeCurrent()) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
while (true) { while (true) {
try { try {
@ -306,8 +294,6 @@ public class RecoverableZooKeeper {
} }
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
} }
} finally {
span.end();
} }
} }
@ -317,9 +303,7 @@ public class RecoverableZooKeeper {
*/ */
public List<String> getChildren(String path, boolean watch) public List<String> getChildren(String path, boolean watch)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
Span span = try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan();
try (Scope scope = span.makeCurrent()) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
while (true) { while (true) {
try { try {
@ -341,8 +325,6 @@ public class RecoverableZooKeeper {
} }
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
} }
} finally {
span.end();
} }
} }
@ -352,8 +334,7 @@ public class RecoverableZooKeeper {
*/ */
public byte[] getData(String path, Watcher watcher, Stat stat) public byte[] getData(String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan(); try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
try (Scope scope = span.makeCurrent()) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
while (true) { while (true) {
try { try {
@ -374,8 +355,6 @@ public class RecoverableZooKeeper {
} }
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
} }
} finally {
span.end();
} }
} }
@ -385,9 +364,7 @@ public class RecoverableZooKeeper {
*/ */
public byte[] getData(String path, boolean watch, Stat stat) public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
Span span = try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan();
try (Scope scope = span.makeCurrent()) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
while (true) { while (true) {
try { try {
@ -409,8 +386,6 @@ public class RecoverableZooKeeper {
} }
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
} }
} finally {
span.end();
} }
} }
@ -422,8 +397,7 @@ public class RecoverableZooKeeper {
*/ */
public Stat setData(String path, byte[] data, int version) public Stat setData(String path, byte[] data, int version)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setData").startSpan(); try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) {
try (Scope scope = span.makeCurrent()) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
byte[] newData = ZKMetadata.appendMetaData(id, data); byte[] newData = ZKMetadata.appendMetaData(id, data);
boolean isRetry = false; boolean isRetry = false;
@ -463,8 +437,6 @@ public class RecoverableZooKeeper {
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
isRetry = true; isRetry = true;
} }
} finally {
span.end();
} }
} }
@ -472,9 +444,9 @@ public class RecoverableZooKeeper {
* getAcl is an idempotent operation. Retry before throwing exception * getAcl is an idempotent operation. Retry before throwing exception
* @return list of ACLs * @return list of ACLs
*/ */
public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException { public List<ACL> getAcl(String path, Stat stat)
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getAcl").startSpan(); throws KeeperException, InterruptedException {
try (Scope scope = span.makeCurrent()) { try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
while (true) { while (true) {
try { try {
@ -495,8 +467,6 @@ public class RecoverableZooKeeper {
} }
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
} }
} finally {
span.end();
} }
} }
@ -506,8 +476,7 @@ public class RecoverableZooKeeper {
*/ */
public Stat setAcl(String path, List<ACL> acls, int version) public Stat setAcl(String path, List<ACL> acls, int version)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setAcl").startSpan(); try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) {
try (Scope scope = span.makeCurrent()) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
while (true) { while (true) {
try { try {
@ -527,8 +496,6 @@ public class RecoverableZooKeeper {
} }
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
} }
} finally {
span.end();
} }
} }
@ -547,10 +514,10 @@ public class RecoverableZooKeeper {
* *
* @return Path * @return Path
*/ */
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) public String create(String path, byte[] data, List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.create").startSpan(); try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) {
try (Scope scope = span.makeCurrent()) {
byte[] newData = ZKMetadata.appendMetaData(id, data); byte[] newData = ZKMetadata.appendMetaData(id, data);
switch (createMode) { switch (createMode) {
case EPHEMERAL: case EPHEMERAL:
@ -565,8 +532,6 @@ public class RecoverableZooKeeper {
throw new IllegalArgumentException("Unrecognized CreateMode: " + throw new IllegalArgumentException("Unrecognized CreateMode: " +
createMode); createMode);
} }
} finally {
span.end();
} }
} }
@ -682,9 +647,9 @@ public class RecoverableZooKeeper {
/** /**
* Run multiple operations in a transactional manner. Retry before throwing exception * Run multiple operations in a transactional manner. Retry before throwing exception
*/ */
public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException { public List<OpResult> multi(Iterable<Op> ops)
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.multi").startSpan(); throws KeeperException, InterruptedException {
try (Scope scope = span.makeCurrent()) { try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
RetryCounter retryCounter = retryCounterFactory.create(); RetryCounter retryCounter = retryCounterFactory.create();
Iterable<Op> multiOps = prepareZKMulti(ops); Iterable<Op> multiOps = prepareZKMulti(ops);
while (true) { while (true) {
@ -706,8 +671,6 @@ public class RecoverableZooKeeper {
} }
retryCounter.sleepUntilNextRetry(); retryCounter.sleepUntilNextRetry();
} }
} finally {
span.end();
} }
} }

30
pom.xml
View File

@ -1025,25 +1025,6 @@
</rules> </rules>
</configuration> </configuration>
</execution> </execution>
<execution>
<id>banned-htrace</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<bannedDependencies>
<excludes>
<exclude>org.apache.htrace:**</exclude>
</excludes>
<message>
Use OpenTelemetry instead
</message>
<searchTransitive>false</searchTransitive>
</bannedDependencies>
</rules>
</configuration>
</execution>
<execution> <execution>
<id>check-aggregate-license</id> <id>check-aggregate-license</id>
<!-- must check after LICENSE is built at 'generate-resources' --> <!-- must check after LICENSE is built at 'generate-resources' -->
@ -1153,10 +1134,9 @@
<restrictImports implementation="de.skuzzle.enforcer.restrictimports.rule.RestrictImports"> <restrictImports implementation="de.skuzzle.enforcer.restrictimports.rule.RestrictImports">
<includeTestCode>true</includeTestCode> <includeTestCode>true</includeTestCode>
<commentLineBufferSize>512</commentLineBufferSize> <commentLineBufferSize>512</commentLineBufferSize>
<reason>Do not use htrace</reason> <reason>Do not use htrace v3</reason>
<bannedImports> <bannedImports>
<bannedImport>org.htrace.**</bannedImport> <bannedImport>org.htrace.**</bannedImport>
<bannedImport>org.apache.htrace.**</bannedImport>
</bannedImports> </bannedImports>
</restrictImports> </restrictImports>
<restrictImports implementation="de.skuzzle.enforcer.restrictimports.rule.RestrictImports"> <restrictImports implementation="de.skuzzle.enforcer.restrictimports.rule.RestrictImports">
@ -1482,7 +1462,7 @@
<jruby.version>9.2.13.0</jruby.version> <jruby.version>9.2.13.0</jruby.version>
<junit.version>4.13</junit.version> <junit.version>4.13</junit.version>
<hamcrest.version>1.3</hamcrest.version> <hamcrest.version>1.3</hamcrest.version>
<opentelemetry.version>0.12.0</opentelemetry.version> <htrace.version>4.2.0-incubating</htrace.version>
<log4j.version>1.2.17</log4j.version> <log4j.version>1.2.17</log4j.version>
<mockito-core.version>2.28.2</mockito-core.version> <mockito-core.version>2.28.2</mockito-core.version>
<!--Internally we use a different version of protobuf. See hbase-protocol-shaded--> <!--Internally we use a different version of protobuf. See hbase-protocol-shaded-->
@ -2174,9 +2154,9 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.opentelemetry</groupId> <groupId>org.apache.htrace</groupId>
<artifactId>opentelemetry-api</artifactId> <artifactId>htrace-core4</artifactId>
<version>${opentelemetry.version}</version> <version>${htrace.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.lmax</groupId> <groupId>com.lmax</groupId>