HBASE-26124 Backport HBASE-25373 "Remove HTrace completely in code base and try to make use of OpenTelemetry" to branch-2 (#3529)
1/17 commits of HBASE-22120 Signed-off-by: Peter Somogyi <psomogyi@apache.org>
This commit is contained in:
parent
008ffd2130
commit
665305cc3b
|
@ -19,11 +19,9 @@ package org.apache.hadoop.hbase.io.asyncfs;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -104,7 +102,6 @@ public abstract class AsyncFSTestBase {
|
|||
org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class)
|
||||
.setLevel(org.apache.log4j.Level.ERROR);
|
||||
|
||||
TraceUtil.initTracer(conf);
|
||||
CLUSTER = new MiniDFSCluster.Builder(conf).numDataNodes(servers).build();
|
||||
CLUSTER.waitClusterUp();
|
||||
}
|
||||
|
|
|
@ -145,8 +145,8 @@
|
|||
<artifactId>zookeeper</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.htrace</groupId>
|
||||
<artifactId>htrace-core4</artifactId>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jruby.jcodings</groupId>
|
||||
|
|
|
@ -46,10 +46,8 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.htrace.core.Tracer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -572,13 +570,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
asyncProcess.incTaskCounters(multiAction.getRegions(), server);
|
||||
SingleServerRequestRunnable runnable = createSingleServerRequest(
|
||||
multiAction, numAttempt, server, callsInProgress);
|
||||
Tracer tracer = Tracer.curThreadTracer();
|
||||
|
||||
if (tracer == null) {
|
||||
return Collections.singletonList(runnable);
|
||||
} else {
|
||||
return Collections.singletonList(tracer.wrap(runnable, "AsyncProcess.sendMultiAction"));
|
||||
}
|
||||
// remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
|
||||
return Collections.singletonList(runnable);
|
||||
}
|
||||
|
||||
// group the actions by the amount of delay
|
||||
|
@ -598,12 +592,10 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
List<Runnable> toReturn = new ArrayList<>(actions.size());
|
||||
for (DelayingRunner runner : actions.values()) {
|
||||
asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
|
||||
String traceText = "AsyncProcess.sendMultiAction";
|
||||
Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
|
||||
// use a delay runner only if we need to sleep for some time
|
||||
if (runner.getSleepTime() > 0) {
|
||||
runner.setRunner(runnable);
|
||||
traceText = "AsyncProcess.clientBackoff.sendMultiAction";
|
||||
runnable = runner;
|
||||
if (asyncProcess.connection.getConnectionMetrics() != null) {
|
||||
asyncProcess.connection.getConnectionMetrics()
|
||||
|
@ -614,7 +606,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
|
||||
}
|
||||
}
|
||||
runnable = TraceUtil.wrap(runnable, traceText);
|
||||
// remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
|
||||
toReturn.add(runnable);
|
||||
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import java.util.concurrent.RunnableFuture;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -168,7 +167,8 @@ public class ResultBoundedCompletionService<V> {
|
|||
|
||||
public void submit(RetryingCallable<V> task, int callTimeout, int id) {
|
||||
QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id);
|
||||
executor.execute(TraceUtil.wrap(newFuture, "ResultBoundedCompletionService.submit"));
|
||||
// remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
|
||||
executor.execute(newFuture);
|
||||
tasks[id] = newFuture;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,9 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
|
|||
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
|
||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
|
@ -62,7 +65,6 @@ import org.apache.hadoop.ipc.RemoteException;
|
|||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -593,9 +595,12 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
|||
}
|
||||
|
||||
private void tracedWriteRequest(Call call) throws IOException {
|
||||
try (TraceScope ignored = TraceUtil.createTrace("RpcClientImpl.tracedWriteRequest",
|
||||
call.span)) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
|
||||
.setParent(Context.current().with(call.span)).startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
writeRequest(call);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
|
@ -24,13 +25,13 @@ import org.apache.commons.lang3.builder.ToStringStyle;
|
|||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.htrace.core.Span;
|
||||
import org.apache.htrace.core.Tracer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
/** A call waiting for a value. */
|
||||
|
@ -73,7 +74,7 @@ class Call {
|
|||
this.timeout = timeout;
|
||||
this.priority = priority;
|
||||
this.callback = callback;
|
||||
this.span = Tracer.getCurrentSpan();
|
||||
this.span = Span.current();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -192,8 +192,8 @@
|
|||
</dependency>
|
||||
<!-- tracing Dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.htrace</groupId>
|
||||
<artifactId>htrace-core4</artifactId>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.trace;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.htrace.core.HTraceConfiguration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class HBaseHTraceConfiguration extends HTraceConfiguration {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HBaseHTraceConfiguration.class);
|
||||
|
||||
public static final String KEY_PREFIX = "hbase.htrace.";
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
private void handleDeprecation(String key) {
|
||||
String oldKey = "hbase." + key;
|
||||
String newKey = KEY_PREFIX + key;
|
||||
String oldValue = conf.get(oldKey);
|
||||
if (oldValue != null) {
|
||||
LOG.warn("Warning: using deprecated configuration key " + oldKey +
|
||||
". Please use " + newKey + " instead.");
|
||||
String newValue = conf.get(newKey);
|
||||
if (newValue == null) {
|
||||
conf.set(newKey, oldValue);
|
||||
} else {
|
||||
LOG.warn("Conflicting values for " + newKey + " and " + oldKey +
|
||||
". Using " + newValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public HBaseHTraceConfiguration(Configuration conf) {
|
||||
this.conf = conf;
|
||||
handleDeprecation("local-file-span-receiver.path");
|
||||
handleDeprecation("local-file-span-receiver.capacity");
|
||||
handleDeprecation("sampler.frequency");
|
||||
handleDeprecation("sampler.fraction");
|
||||
handleDeprecation("zipkin.collector-hostname");
|
||||
handleDeprecation("zipkin.collector-port");
|
||||
handleDeprecation("zipkin.num-threads");
|
||||
handleDeprecation("zipkin.traced-service-hostname");
|
||||
handleDeprecation("zipkin.traced-service-port");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(String key) {
|
||||
return conf.get(KEY_PREFIX + key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(String key, String defaultValue) {
|
||||
return conf.get(KEY_PREFIX + key,defaultValue);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getBoolean(String key, boolean defaultValue) {
|
||||
return conf.getBoolean(KEY_PREFIX + key, defaultValue);
|
||||
}
|
||||
}
|
|
@ -1,120 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.trace;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.htrace.core.SpanReceiver;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class provides functions for reading the names of SpanReceivers from
|
||||
* hbase-site.xml, adding those SpanReceivers to the Tracer, and closing those
|
||||
* SpanReceivers when appropriate.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SpanReceiverHost {
|
||||
public static final String SPAN_RECEIVERS_CONF_KEY = "hbase.trace.spanreceiver.classes";
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SpanReceiverHost.class);
|
||||
private Collection<SpanReceiver> receivers;
|
||||
private Configuration conf;
|
||||
private boolean closed = false;
|
||||
|
||||
private enum SingletonHolder {
|
||||
INSTANCE;
|
||||
final transient Object lock = new Object();
|
||||
transient SpanReceiverHost host = null;
|
||||
}
|
||||
|
||||
public static SpanReceiverHost getInstance(Configuration conf) {
|
||||
synchronized (SingletonHolder.INSTANCE.lock) {
|
||||
if (SingletonHolder.INSTANCE.host != null) {
|
||||
return SingletonHolder.INSTANCE.host;
|
||||
}
|
||||
|
||||
SpanReceiverHost host = new SpanReceiverHost(conf);
|
||||
host.loadSpanReceivers();
|
||||
SingletonHolder.INSTANCE.host = host;
|
||||
return SingletonHolder.INSTANCE.host;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static Configuration getConfiguration(){
|
||||
synchronized (SingletonHolder.INSTANCE.lock) {
|
||||
if (SingletonHolder.INSTANCE.host == null || SingletonHolder.INSTANCE.host.conf == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return SingletonHolder.INSTANCE.host.conf;
|
||||
}
|
||||
}
|
||||
|
||||
SpanReceiverHost(Configuration conf) {
|
||||
receivers = new HashSet<>();
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the names of classes specified in the {@code hbase.trace.spanreceiver.classes} property
|
||||
* and instantiates and registers them with the Tracer.
|
||||
*/
|
||||
public void loadSpanReceivers() {
|
||||
String[] receiverNames = conf.getStrings(SPAN_RECEIVERS_CONF_KEY);
|
||||
if (receiverNames == null || receiverNames.length == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
SpanReceiver.Builder builder = new SpanReceiver.Builder(new HBaseHTraceConfiguration(conf));
|
||||
for (String className : receiverNames) {
|
||||
className = className.trim();
|
||||
|
||||
SpanReceiver receiver = builder.className(className).build();
|
||||
if (receiver != null) {
|
||||
receivers.add(receiver);
|
||||
LOG.info("SpanReceiver {} was loaded successfully.", className);
|
||||
}
|
||||
}
|
||||
for (SpanReceiver rcvr : receivers) {
|
||||
TraceUtil.addReceiver(rcvr);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls close() on all SpanReceivers created by this SpanReceiverHost.
|
||||
*/
|
||||
public synchronized void closeReceivers() {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
closed = true;
|
||||
for (SpanReceiver rcvr : receivers) {
|
||||
try {
|
||||
rcvr.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,112 +17,19 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.trace;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.htrace.core.HTraceConfiguration;
|
||||
import org.apache.htrace.core.Sampler;
|
||||
import org.apache.htrace.core.Span;
|
||||
import org.apache.htrace.core.SpanReceiver;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.htrace.core.Tracer;
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.api.trace.Tracer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* This wrapper class provides functions for accessing htrace 4+ functionality in a simplified way.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class TraceUtil {
|
||||
private static HTraceConfiguration conf;
|
||||
private static Tracer tracer;
|
||||
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.hbase";
|
||||
|
||||
private TraceUtil() {
|
||||
}
|
||||
|
||||
public static void initTracer(Configuration c) {
|
||||
if (c != null) {
|
||||
conf = new HBaseHTraceConfiguration(c);
|
||||
}
|
||||
|
||||
if (tracer == null && conf != null) {
|
||||
tracer = new Tracer.Builder("Tracer").conf(conf).build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper method to create new TraceScope with the given description
|
||||
* @return TraceScope or null when not tracing
|
||||
*/
|
||||
public static TraceScope createTrace(String description) {
|
||||
return (tracer == null) ? null : tracer.newScope(description);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper method to create new child TraceScope with the given description
|
||||
* and parent scope's spanId
|
||||
* @param span parent span
|
||||
* @return TraceScope or null when not tracing
|
||||
*/
|
||||
public static TraceScope createTrace(String description, Span span) {
|
||||
if (span == null) {
|
||||
return createTrace(description);
|
||||
}
|
||||
|
||||
return (tracer == null) ? null : tracer.newScope(description, span.getSpanId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper method to add new sampler to the default tracer
|
||||
* @return true if added, false if it was already added
|
||||
*/
|
||||
public static boolean addSampler(Sampler sampler) {
|
||||
if (sampler == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (tracer == null) ? false : tracer.addSampler(sampler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper method to add key-value pair to TraceInfo of actual span
|
||||
*/
|
||||
public static void addKVAnnotation(String key, String value){
|
||||
Span span = Tracer.getCurrentSpan();
|
||||
if (span != null) {
|
||||
span.addKVAnnotation(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper method to add receiver to actual tracerpool
|
||||
* @return true if successfull, false if it was already added
|
||||
*/
|
||||
public static boolean addReceiver(SpanReceiver rcvr) {
|
||||
return (tracer == null) ? false : tracer.getTracerPool().addReceiver(rcvr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper method to remove receiver from actual tracerpool
|
||||
* @return true if removed, false if doesn't exist
|
||||
*/
|
||||
public static boolean removeReceiver(SpanReceiver rcvr) {
|
||||
return (tracer == null) ? false : tracer.getTracerPool().removeReceiver(rcvr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper method to add timeline annotiation to current span with given message
|
||||
*/
|
||||
public static void addTimelineAnnotation(String msg) {
|
||||
Span span = Tracer.getCurrentSpan();
|
||||
if (span != null) {
|
||||
span.addTimelineAnnotation(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap runnable with current tracer and description
|
||||
* @param runnable to wrap
|
||||
* @return wrapped runnable or original runnable when not tracing
|
||||
*/
|
||||
public static Runnable wrap(Runnable runnable, String description) {
|
||||
return (tracer == null) ? runnable : tracer.wrap(runnable, description);
|
||||
public static Tracer getGlobalTracer() {
|
||||
return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,10 +109,6 @@
|
|||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.htrace</groupId>
|
||||
<artifactId>htrace-core4</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -27,13 +29,11 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import net.spy.memcached.CachedData;
|
||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||
import net.spy.memcached.FailureMode;
|
||||
import net.spy.memcached.MemcachedClient;
|
||||
import net.spy.memcached.transcoders.Transcoder;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
|
@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
|
|||
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -129,12 +128,12 @@ public class MemcachedBlockCache implements BlockCache {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
|
||||
boolean repeat, boolean updateCacheMetrics) {
|
||||
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
|
||||
boolean updateCacheMetrics) {
|
||||
// Assume that nothing is the block cache
|
||||
HFileBlock result = null;
|
||||
|
||||
try (TraceScope traceScope = TraceUtil.createTrace("MemcachedBlockCache.getBlock")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("MemcachedBlockCache.getBlock").startSpan();
|
||||
try (Scope traceScope = span.makeCurrent()) {
|
||||
result = client.get(cacheKey.toString(), tc);
|
||||
} catch (Exception e) {
|
||||
// Catch a pretty broad set of exceptions to limit any changes in the memecache client
|
||||
|
@ -146,6 +145,7 @@ public class MemcachedBlockCache implements BlockCache {
|
|||
}
|
||||
result = null;
|
||||
} finally {
|
||||
span.end();
|
||||
// Update stats if this request doesn't have it turned off 100% of the time
|
||||
if (updateCacheMetrics) {
|
||||
if (result == null) {
|
||||
|
|
|
@ -247,8 +247,8 @@
|
|||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.htrace</groupId>
|
||||
<artifactId>htrace-core4</artifactId>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
</dependency>
|
||||
<!-- Hadoop needs Netty 3.x at test scope for the minicluster -->
|
||||
<dependency>
|
||||
|
|
|
@ -78,7 +78,6 @@ public class IntegrationTestTableMapReduceUtil implements Configurable, Tool {
|
|||
assertTrue(tmpjars.contains("netty"));
|
||||
assertTrue(tmpjars.contains("protobuf"));
|
||||
assertTrue(tmpjars.contains("guava"));
|
||||
assertTrue(tmpjars.contains("htrace"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.mttr;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assume.assumeFalse;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -64,9 +66,6 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
|||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LoadTestTool;
|
||||
import org.apache.htrace.core.AlwaysSampler;
|
||||
import org.apache.htrace.core.Span;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -376,12 +375,9 @@ public class IntegrationTestMTTR {
|
|||
* @param span Span. To be kept if the time taken was over 1 second
|
||||
*/
|
||||
public void addResult(long time, Span span) {
|
||||
if (span == null) {
|
||||
return;
|
||||
}
|
||||
stats.addValue(TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS));
|
||||
if (TimeUnit.SECONDS.convert(time, TimeUnit.NANOSECONDS) >= 1) {
|
||||
traces.add(span.getTracerId());
|
||||
traces.add(span.getSpanContext().getTraceIdAsHexString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -421,15 +417,11 @@ public class IntegrationTestMTTR {
|
|||
final int maxIterations = 10;
|
||||
int numAfterDone = 0;
|
||||
int resetCount = 0;
|
||||
TraceUtil.addSampler(AlwaysSampler.INSTANCE);
|
||||
// Keep trying until the rs is back up and we've gotten a put through
|
||||
while (numAfterDone < maxIterations) {
|
||||
long start = System.nanoTime();
|
||||
Span span = null;
|
||||
try (TraceScope scope = TraceUtil.createTrace(getSpanName())) {
|
||||
if (scope != null) {
|
||||
span = scope.getSpan();
|
||||
}
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder(getSpanName()).startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
boolean actionResult = doAction();
|
||||
if (actionResult && future.isDone()) {
|
||||
numAfterDone++;
|
||||
|
@ -460,7 +452,6 @@ public class IntegrationTestMTTR {
|
|||
throw e;
|
||||
} catch (RetriesExhaustedException e){
|
||||
throw e;
|
||||
|
||||
// Everything else is potentially recoverable on the application side. For instance, a CM
|
||||
// action kills the RS that hosted a scanner the client was using. Continued use of that
|
||||
// scanner should be terminated, but a new scanner can be created and the read attempted
|
||||
|
@ -475,6 +466,8 @@ public class IntegrationTestMTTR {
|
|||
LOG.info("Too many unexpected Exceptions. Aborting.", e);
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
result.addResult(System.nanoTime() - start, span);
|
||||
}
|
||||
|
|
|
@ -18,10 +18,19 @@
|
|||
|
||||
package org.apache.hadoop.hbase.trace;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||
|
@ -31,26 +40,21 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.htrace.core.Sampler;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Category(IntegrationTests.class)
|
||||
public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(IntegrationTestSendTraceRequests.class);
|
||||
public static final String TABLE_ARG = "t";
|
||||
public static final String CF_ARG = "f";
|
||||
|
||||
|
@ -61,7 +65,6 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
private IntegrationTestingUtility util;
|
||||
private Random random = new Random();
|
||||
private Admin admin;
|
||||
private SpanReceiverHost receiverHost;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Configuration configuration = HBaseConfiguration.create();
|
||||
|
@ -95,7 +98,6 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
public void internalDoWork() throws Exception {
|
||||
util = createUtil();
|
||||
admin = util.getAdmin();
|
||||
setupReceiver();
|
||||
|
||||
deleteTable();
|
||||
createTable();
|
||||
|
@ -108,51 +110,53 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
service.shutdown();
|
||||
service.awaitTermination(100, TimeUnit.SECONDS);
|
||||
Thread.sleep(90000);
|
||||
receiverHost.closeReceivers();
|
||||
util.restoreCluster();
|
||||
util = null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
private void doScans(ExecutorService service, final LinkedBlockingQueue<Long> rks) {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Runnable runnable = new Runnable() {
|
||||
private final LinkedBlockingQueue<Long> rowKeyQueue = rks;
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Runnable runnable = new Runnable() {
|
||||
private final LinkedBlockingQueue<Long> rowKeyQueue = rks;
|
||||
@Override
|
||||
public void run() {
|
||||
ResultScanner rs = null;
|
||||
TraceUtil.addSampler(Sampler.ALWAYS);
|
||||
try (TraceScope scope = TraceUtil.createTrace("Scan")){
|
||||
Table ht = util.getConnection().getTable(tableName);
|
||||
Scan s = new Scan();
|
||||
s.setStartRow(Bytes.toBytes(rowKeyQueue.take()));
|
||||
s.setBatch(7);
|
||||
rs = ht.getScanner(s);
|
||||
// Something to keep the jvm from removing the loop.
|
||||
long accum = 0;
|
||||
@Override public void run() {
|
||||
ResultScanner rs = null;
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("Scan").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
Table ht = util.getConnection().getTable(tableName);
|
||||
Scan s = new Scan();
|
||||
s.withStartRow(Bytes.toBytes(rowKeyQueue.take()));
|
||||
s.setBatch(7);
|
||||
rs = ht.getScanner(s);
|
||||
// Something to keep the jvm from removing the loop.
|
||||
long accum = 0;
|
||||
|
||||
for(int x = 0; x < 1000; x++) {
|
||||
Result r = rs.next();
|
||||
accum |= Bytes.toLong(r.getRow());
|
||||
}
|
||||
|
||||
TraceUtil.addTimelineAnnotation("Accum result = " + accum);
|
||||
|
||||
ht.close();
|
||||
ht = null;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
TraceUtil.addKVAnnotation("exception", e.getClass().getSimpleName());
|
||||
} catch (Exception e) {
|
||||
} finally {
|
||||
if (rs != null) rs.close();
|
||||
for (int x = 0; x < 1000; x++) {
|
||||
Result r = rs.next();
|
||||
accum |= Bytes.toLong(r.getRow());
|
||||
}
|
||||
|
||||
}
|
||||
};
|
||||
service.submit(runnable);
|
||||
}
|
||||
span.addEvent("Accum result = " + accum);
|
||||
|
||||
ht.close();
|
||||
ht = null;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception occurred while scanning table", e);
|
||||
span.addEvent("exception",
|
||||
Attributes.of(AttributeKey.stringKey("exception"), e.getClass().getSimpleName()));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception occurred while scanning table", e);
|
||||
} finally {
|
||||
span.end();
|
||||
if (rs != null) {
|
||||
rs.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
service.submit(runnable);
|
||||
}
|
||||
}
|
||||
|
||||
private void doGets(ExecutorService service, final LinkedBlockingQueue<Long> rowKeys)
|
||||
|
@ -173,9 +177,9 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
}
|
||||
|
||||
long accum = 0;
|
||||
TraceUtil.addSampler(Sampler.ALWAYS);
|
||||
for (int x = 0; x < 5; x++) {
|
||||
try (TraceScope scope = TraceUtil.createTrace("gets")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("gets").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
long rk = rowKeyQueue.take();
|
||||
Result r1 = ht.get(new Get(Bytes.toBytes(rk)));
|
||||
if (r1 != null) {
|
||||
|
@ -185,10 +189,12 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
if (r2 != null) {
|
||||
accum |= Bytes.toLong(r2.getRow());
|
||||
}
|
||||
TraceUtil.addTimelineAnnotation("Accum = " + accum);
|
||||
span.addEvent("Accum = " + accum);
|
||||
|
||||
} catch (IOException|InterruptedException ie) {
|
||||
// IGNORED
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -199,18 +205,22 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
}
|
||||
|
||||
private void createTable() throws IOException {
|
||||
TraceUtil.addSampler(Sampler.ALWAYS);
|
||||
try (TraceScope scope = TraceUtil.createTrace("createTable")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("createTable").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
util.createTable(tableName, familyName);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteTable() throws IOException {
|
||||
TraceUtil.addSampler(Sampler.ALWAYS);
|
||||
try (TraceScope scope = TraceUtil.createTrace("deleteTable")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("deleteTable").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
if (admin.tableExists(tableName)) {
|
||||
util.deleteTable(tableName);
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -218,9 +228,9 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<>(25000);
|
||||
BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName);
|
||||
byte[] value = new byte[300];
|
||||
TraceUtil.addSampler(Sampler.ALWAYS);
|
||||
for (int x = 0; x < 5000; x++) {
|
||||
try (TraceScope traceScope = TraceUtil.createTrace("insertData")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("insertData").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
for (int i = 0; i < 5; i++) {
|
||||
long rk = random.nextLong();
|
||||
rowKeys.add(rk);
|
||||
|
@ -234,6 +244,8 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
if ((x % 1000) == 0) {
|
||||
admin.flush(tableName);
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
admin.flush(tableName);
|
||||
|
@ -255,11 +267,4 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
}
|
||||
return this.util;
|
||||
}
|
||||
|
||||
private void setupReceiver() {
|
||||
Configuration conf = new Configuration(util.getConfiguration());
|
||||
conf.setBoolean("hbase.zipkin.is-in-client-mode", true);
|
||||
|
||||
this.receiverHost = SpanReceiverHost.getInstance(conf);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,8 +155,8 @@
|
|||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.htrace</groupId>
|
||||
<artifactId>htrace-core4</artifactId>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
|
|
|
@ -831,7 +831,6 @@ public class TableMapReduceUtil {
|
|||
org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty
|
||||
org.apache.zookeeper.ZooKeeper.class, // zookeeper
|
||||
com.google.protobuf.Message.class, // protobuf
|
||||
org.apache.htrace.core.Tracer.class, // htrace
|
||||
com.codahale.metrics.MetricRegistry.class, // metrics-core
|
||||
org.apache.commons.lang3.ArrayUtils.class); // commons-lang
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.UniformReservoir;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.lang.reflect.Constructor;
|
||||
|
@ -84,8 +86,6 @@ import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
|
|||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
|
||||
import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
|
||||
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.ByteArrayHashKey;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -104,9 +104,6 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
|||
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.htrace.core.ProbabilitySampler;
|
||||
import org.apache.htrace.core.Sampler;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -697,6 +694,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
int totalRows = DEFAULT_ROWS_PER_GB;
|
||||
int measureAfter = 0;
|
||||
float sampleRate = 1.0f;
|
||||
/**
|
||||
* @deprecated Useless after switching to OpenTelemetry
|
||||
*/
|
||||
@Deprecated
|
||||
double traceRate = 0.0;
|
||||
String tableName = TABLE_NAME;
|
||||
boolean flushCommits = true;
|
||||
|
@ -1147,8 +1148,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
protected final TestOptions opts;
|
||||
|
||||
private final Status status;
|
||||
private final Sampler traceSampler;
|
||||
private final SpanReceiverHost receiverHost;
|
||||
|
||||
private String testName;
|
||||
private Histogram latencyHistogram;
|
||||
|
@ -1170,18 +1169,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
*/
|
||||
TestBase(final Configuration conf, final TestOptions options, final Status status) {
|
||||
this.conf = conf;
|
||||
this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
|
||||
this.opts = options;
|
||||
this.status = status;
|
||||
this.testName = this.getClass().getSimpleName();
|
||||
if (options.traceRate >= 1.0) {
|
||||
this.traceSampler = Sampler.ALWAYS;
|
||||
} else if (options.traceRate > 0.0) {
|
||||
conf.setDouble("hbase.sampler.fraction", options.traceRate);
|
||||
this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
|
||||
} else {
|
||||
this.traceSampler = Sampler.NEVER;
|
||||
}
|
||||
everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
|
||||
if (options.isValueZipf()) {
|
||||
this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
|
||||
|
@ -1351,7 +1341,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
|
||||
}
|
||||
}
|
||||
receiverHost.closeReceivers();
|
||||
}
|
||||
|
||||
abstract void onTakedown() throws IOException;
|
||||
|
@ -1388,7 +1377,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
void testTimed() throws IOException, InterruptedException {
|
||||
int startRow = getStartRow();
|
||||
int lastRow = getLastRow();
|
||||
TraceUtil.addSampler(traceSampler);
|
||||
// Report on completion of 1/10th of total.
|
||||
for (int ii = 0; ii < opts.cycles; ii++) {
|
||||
if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
|
||||
|
@ -1396,8 +1384,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
if (i % everyN != 0) continue;
|
||||
long startTime = System.nanoTime();
|
||||
boolean requestSent = false;
|
||||
try (TraceScope scope = TraceUtil.createTrace("test row");){
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
|
||||
try (Scope scope = span.makeCurrent()){
|
||||
requestSent = testRow(i, startTime);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
if ( (i - startRow) > opts.measureAfter) {
|
||||
// If multiget or multiput is enabled, say set to 10, testRow() returns immediately
|
||||
|
|
|
@ -201,10 +201,6 @@
|
|||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.htrace</groupId>
|
||||
<artifactId>htrace-core4</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<!-- Skip the tests in this module -->
|
||||
|
|
|
@ -427,8 +427,8 @@
|
|||
</dependency>
|
||||
<!-- tracing Dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.htrace</groupId>
|
||||
<artifactId>htrace-core4</artifactId>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.lmax</groupId>
|
||||
|
|
|
@ -18,14 +18,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.executor;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.htrace.core.Span;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.htrace.core.Tracer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -75,7 +75,7 @@ public abstract class EventHandler implements Runnable, Comparable<EventHandler>
|
|||
* Default base class constructor.
|
||||
*/
|
||||
public EventHandler(Server server, EventType eventType) {
|
||||
this.parent = Tracer.getCurrentSpan();
|
||||
this.parent = Span.current();
|
||||
this.server = server;
|
||||
this.eventType = eventType;
|
||||
seqid = seqids.incrementAndGet();
|
||||
|
@ -100,10 +100,14 @@ public abstract class EventHandler implements Runnable, Comparable<EventHandler>
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
try (TraceScope scope = TraceUtil.createTrace(this.getClass().getSimpleName(), parent)) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder(getClass().getSimpleName())
|
||||
.setParent(Context.current().with(parent)).startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
process();
|
||||
} catch(Throwable t) {
|
||||
} catch (Throwable t) {
|
||||
handleException(t);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.DataInput;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -48,7 +50,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
import org.apache.hadoop.hbase.util.ObjectIntPair;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -1287,7 +1288,8 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
|
||||
boolean useLock = false;
|
||||
IdLock.Entry lockEntry = null;
|
||||
try (TraceScope traceScope = TraceUtil.createTrace("HFileReaderImpl.readBlock")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("HFileReaderImpl.readBlock").startSpan();
|
||||
try (Scope traceScope = span.makeCurrent()) {
|
||||
while (true) {
|
||||
// Check cache for block. If found return.
|
||||
if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
|
||||
|
@ -1302,7 +1304,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("From Cache " + cachedBlock);
|
||||
}
|
||||
TraceUtil.addTimelineAnnotation("blockCacheHit");
|
||||
span.addEvent("blockCacheHit");
|
||||
assert cachedBlock.isUnpacked() : "Packed block leak.";
|
||||
if (cachedBlock.getBlockType().isData()) {
|
||||
if (updateCacheMetrics) {
|
||||
|
@ -1332,7 +1334,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
// Carry on, please load.
|
||||
}
|
||||
|
||||
TraceUtil.addTimelineAnnotation("blockCacheMiss");
|
||||
span.addEvent("blockCacheMiss");
|
||||
// Load block from filesystem.
|
||||
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
|
||||
!isCompaction, shouldUseHeap(expectedBlockType));
|
||||
|
@ -1362,6 +1364,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
if (lockEntry != null) {
|
||||
offsetLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,23 +17,24 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.hbase.CallDroppedException;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
|
||||
/**
|
||||
|
@ -94,6 +95,14 @@ public class CallRunner {
|
|||
this.rpcServer = null;
|
||||
}
|
||||
|
||||
private String getServiceName() {
|
||||
return call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
|
||||
}
|
||||
|
||||
private String getMethodName() {
|
||||
return call.getMethod() != null ? call.getMethod().getName() : "";
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
if (call.disconnectSince() >= 0) {
|
||||
|
@ -118,18 +127,16 @@ public class CallRunner {
|
|||
String error = null;
|
||||
Pair<Message, CellScanner> resultPair = null;
|
||||
RpcServer.CurCall.set(call);
|
||||
TraceScope traceScope = null;
|
||||
try {
|
||||
String serviceName = getServiceName();
|
||||
String methodName = getMethodName();
|
||||
String traceString = serviceName + "." + methodName;
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
|
||||
try (Scope traceScope = span.makeCurrent()) {
|
||||
if (!this.rpcServer.isStarted()) {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
throw new ServerNotRunningYetException("Server " +
|
||||
(address != null ? address : "(channel closed)") + " is not running yet");
|
||||
}
|
||||
String serviceName =
|
||||
call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
|
||||
String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
|
||||
String traceString = serviceName + "." + methodName;
|
||||
traceScope = TraceUtil.createTrace(traceString);
|
||||
// make the call
|
||||
resultPair = this.rpcServer.call(call, this.status);
|
||||
} catch (TimeoutIOException e){
|
||||
|
@ -151,14 +158,12 @@ public class CallRunner {
|
|||
throw (Error)e;
|
||||
}
|
||||
} finally {
|
||||
if (traceScope != null) {
|
||||
traceScope.close();
|
||||
}
|
||||
RpcServer.CurCall.set(null);
|
||||
if (resultPair != null) {
|
||||
this.rpcServer.addCallSize(call.getSize() * -1);
|
||||
sucessful = true;
|
||||
}
|
||||
span.end();
|
||||
}
|
||||
this.status.markComplete("To send response");
|
||||
// return back the RPC request read BB we can do here. It is done by now.
|
||||
|
|
|
@ -201,7 +201,6 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
|
|||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.security.SecurityConstants;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -423,7 +422,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
*/
|
||||
public HMaster(final Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
TraceUtil.initTracer(conf);
|
||||
try {
|
||||
if (conf.getBoolean(MAINTENANCE_MODE, false)) {
|
||||
LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
|
||||
|
|
|
@ -21,15 +21,12 @@ package org.apache.hadoop.hbase.master;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.LocalHBaseCluster;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.ZNodeClearer;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
|
@ -39,6 +36,7 @@ import org.apache.hadoop.hbase.util.ServerCommandLine;
|
|||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -167,8 +165,6 @@ public class HMasterCommandLine extends ServerCommandLine {
|
|||
|
||||
private int startMaster() {
|
||||
Configuration conf = getConf();
|
||||
TraceUtil.initTracer(conf);
|
||||
|
||||
try {
|
||||
// If 'local', defer to LocalHBaseCluster instance. Starts master
|
||||
// and regionserver both in the one JVM.
|
||||
|
|
|
@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
|
|||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
|
||||
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -191,7 +193,6 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
|||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -6573,8 +6574,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
RowLockImpl result = null;
|
||||
|
||||
boolean success = false;
|
||||
try (TraceScope scope = TraceUtil.createTrace("HRegion.getRowLock")) {
|
||||
TraceUtil.addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock"));
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("HRegion.getRowLock").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
span.addEvent("Getting a " + (readLock ? "readLock" : "writeLock"));
|
||||
// Keep trying until we have a lock or error out.
|
||||
// TODO: do we need to add a time component here?
|
||||
while (result == null) {
|
||||
|
@ -6611,7 +6613,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
|
||||
TraceUtil.addTimelineAnnotation("Failed to get row lock");
|
||||
span.addEvent("Failed to get row lock");
|
||||
String message = "Timed out waiting for lock for row: " + rowKey + " in region "
|
||||
+ getRegionInfo().getEncodedName();
|
||||
if (reachDeadlineFirst) {
|
||||
|
@ -6629,7 +6631,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
LOG.debug("Thread interrupted waiting for lock on row: {}, in region {}", rowKey,
|
||||
getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock");
|
||||
span.addEvent("Interrupted exception getting row lock");
|
||||
throw throwOnInterrupt(ie);
|
||||
} catch (Error error) {
|
||||
// The maximum lock count for read lock is 64K (hardcoded), when this maximum count
|
||||
|
@ -6638,13 +6640,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
LOG.warn("Error to get row lock for {}, in region {}, cause: {}", Bytes.toStringBinary(row),
|
||||
getRegionInfo().getRegionNameAsString(), error);
|
||||
IOException ioe = new IOException(error);
|
||||
TraceUtil.addTimelineAnnotation("Error getting row lock");
|
||||
span.addEvent("Error getting row lock");
|
||||
throw ioe;
|
||||
} finally {
|
||||
// Clean up the counts just in case this was the thing keeping the context alive.
|
||||
if (!success && rowLockContext != null) {
|
||||
rowLockContext.cleanUp();
|
||||
}
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -159,8 +159,6 @@ import org.apache.hadoop.hbase.security.User;
|
|||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
||||
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
|
||||
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
|
@ -398,7 +396,6 @@ public class HRegionServer extends Thread implements
|
|||
|
||||
private MetricsRegionServer metricsRegionServer;
|
||||
MetricsRegionServerWrapperImpl metricsRegionServerImpl;
|
||||
private SpanReceiverHost spanReceiverHost;
|
||||
|
||||
/**
|
||||
* ChoreService used to schedule tasks that we want to run periodically
|
||||
|
@ -595,7 +592,6 @@ public class HRegionServer extends Thread implements
|
|||
*/
|
||||
public HRegionServer(final Configuration conf) throws IOException {
|
||||
super("RegionServer"); // thread name
|
||||
TraceUtil.initTracer(conf);
|
||||
try {
|
||||
this.startcode = EnvironmentEdgeManager.currentTime();
|
||||
this.conf = conf;
|
||||
|
@ -667,7 +663,6 @@ public class HRegionServer extends Thread implements
|
|||
(t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
|
||||
|
||||
initializeFileSystem();
|
||||
spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
|
||||
|
||||
this.configurationManager = new ConfigurationManager();
|
||||
setupWindows(getConfiguration(), getConfigurationManager());
|
||||
|
@ -2714,10 +2709,6 @@ public class HRegionServer extends Thread implements
|
|||
if (this.cacheFlusher != null) {
|
||||
this.cacheFlusher.join();
|
||||
}
|
||||
|
||||
if (this.spanReceiverHost != null) {
|
||||
this.spanReceiverHost.closeReceivers();
|
||||
}
|
||||
if (this.walRoller != null) {
|
||||
this.walRoller.close();
|
||||
}
|
||||
|
|
|
@ -18,14 +18,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.LocalHBaseCluster;
|
||||
import org.apache.hadoop.hbase.util.ServerCommandLine;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Class responsible for parsing the command line and starting the
|
||||
|
@ -51,7 +50,6 @@ public class HRegionServerCommandLine extends ServerCommandLine {
|
|||
|
||||
private int start() throws Exception {
|
||||
Configuration conf = getConf();
|
||||
TraceUtil.initTracer(conf);
|
||||
try {
|
||||
// If 'local', don't start a region server here. Defer to
|
||||
// LocalHBaseCluster. It manages 'local' clusters.
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.util.ArrayList;
|
||||
|
@ -36,7 +38,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -49,12 +50,12 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
|||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* Thread that flushes cache on request
|
||||
*
|
||||
|
@ -712,10 +713,12 @@ public class MemStoreFlusher implements FlushRequester {
|
|||
* amount of memstore consumption.
|
||||
*/
|
||||
public void reclaimMemStoreMemory() {
|
||||
try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
|
||||
Span span =
|
||||
TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
FlushType flushType = isAboveHighWaterMark();
|
||||
if (flushType != FlushType.NORMAL) {
|
||||
TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
|
||||
span.addEvent("Force Flush. We're above high water mark.");
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
long nextLogTimeMs = start;
|
||||
synchronized (this.blockSignal) {
|
||||
|
@ -784,6 +787,7 @@ public class MemStoreFlusher implements FlushRequester {
|
|||
if (flushType != FlushType.NORMAL) {
|
||||
wakeupFlushThread();
|
||||
}
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.c
|
|||
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
|
@ -83,7 +85,6 @@ import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
|
|||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -784,9 +785,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
* @throws IOException if there is a problem flushing or closing the underlying FS
|
||||
*/
|
||||
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHFile.replaceWriter").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
doReplaceWriter(oldPath, newPath, nextWriter);
|
||||
return newPath;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -834,7 +838,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
LOG.debug("WAL closed. Skipping rolling of writer");
|
||||
return regionsToFlush;
|
||||
}
|
||||
try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.rollWriter").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
Path oldPath = getOldPath();
|
||||
Path newPath = getNewPath();
|
||||
// Any exception from here on is catastrophic, non-recoverable so we currently abort.
|
||||
|
@ -861,6 +866,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
throw new IOException(
|
||||
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
|
||||
exception);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
return regionsToFlush;
|
||||
} finally {
|
||||
|
@ -1052,7 +1059,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
.append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
|
||||
.append(" ms, current pipeline: ")
|
||||
.append(Arrays.toString(getPipeline())).toString();
|
||||
TraceUtil.addTimelineAnnotation(msg);
|
||||
Span.current().addEvent(msg);
|
||||
LOG.info(msg);
|
||||
// A single sync took too long.
|
||||
// Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
|
||||
|
@ -1088,12 +1095,14 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
long txid = txidHolder.longValue();
|
||||
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
|
||||
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
|
||||
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder(implClassName + ".append").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
|
||||
entry.stampRegionSequenceId(we);
|
||||
ringBuffer.get(txid).load(entry);
|
||||
} finally {
|
||||
ringBuffer.publish(txid);
|
||||
span.end();
|
||||
}
|
||||
return txid;
|
||||
}
|
||||
|
|
|
@ -20,10 +20,11 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.Sequence;
|
||||
import com.lmax.disruptor.Sequencer;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayDeque;
|
||||
|
@ -44,7 +45,6 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -58,12 +58,11 @@ import org.apache.hadoop.hbase.wal.WALEdit;
|
|||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||
|
@ -401,7 +400,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
}
|
||||
|
||||
private void addTimeAnnotation(SyncFuture future, String annotation) {
|
||||
TraceUtil.addTimelineAnnotation(annotation);
|
||||
Span.current().addEvent(annotation);
|
||||
// TODO handle htrace API change, see HBASE-18895
|
||||
// future.setSpan(scope.getSpan());
|
||||
}
|
||||
|
@ -624,7 +623,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
long txid = waitingConsumePayloads.next();
|
||||
SyncFuture future;
|
||||
try {
|
||||
|
@ -638,6 +638,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
consumeExecutor.execute(consumer);
|
||||
}
|
||||
blockOnSync(future);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -646,7 +648,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
if (highestSyncedTxid.get() >= txid) {
|
||||
return;
|
||||
}
|
||||
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
// here we do not use ring buffer sequence as txid
|
||||
long sequence = waitingConsumePayloads.next();
|
||||
SyncFuture future;
|
||||
|
@ -661,6 +664,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
consumeExecutor.execute(consumer);
|
||||
}
|
||||
blockOnSync(future);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,8 @@ import com.lmax.disruptor.LifecycleAware;
|
|||
import com.lmax.disruptor.TimeoutException;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
|
@ -59,7 +61,6 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
|||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -364,7 +365,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
// use assert to make sure no change breaks the logic that
|
||||
// sequence and zigzagLatch will be set together
|
||||
assert sequence > 0L : "Failed to get sequence from ring buffer";
|
||||
TraceUtil.addTimelineAnnotation("awaiting safepoint");
|
||||
Span.current().addEvent("awaiting safepoint");
|
||||
syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence, false));
|
||||
}
|
||||
} catch (FailedSyncBeforeLogCloseException e) {
|
||||
|
@ -436,10 +437,11 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
}
|
||||
|
||||
private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException {
|
||||
Span span = Span.current();
|
||||
try {
|
||||
TraceUtil.addTimelineAnnotation("closing writer");
|
||||
span.addEvent("closing writer");
|
||||
writer.close();
|
||||
TraceUtil.addTimelineAnnotation("writer closed");
|
||||
span.addEvent("writer closed");
|
||||
} catch (IOException ioe) {
|
||||
int errors = closeErrorCount.incrementAndGet();
|
||||
boolean hasUnflushedEntries = isUnflushedEntries();
|
||||
|
@ -649,10 +651,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
long start = System.nanoTime();
|
||||
Throwable lastException = null;
|
||||
try {
|
||||
TraceUtil.addTimelineAnnotation("syncing writer");
|
||||
Span.current().addEvent("syncing writer");
|
||||
long unSyncedFlushSeq = highestUnsyncedTxid;
|
||||
writer.sync(sf.isForceSync());
|
||||
TraceUtil.addTimelineAnnotation("writer synced");
|
||||
Span.current().addEvent("writer synced");
|
||||
if (unSyncedFlushSeq > currentSequence) {
|
||||
currentSequence = unSyncedFlushSeq;
|
||||
}
|
||||
|
@ -791,7 +793,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
}
|
||||
|
||||
// Sync all known transactions
|
||||
private void publishSyncThenBlockOnCompletion(TraceScope scope, boolean forceSync) throws IOException {
|
||||
private void publishSyncThenBlockOnCompletion(Scope scope, boolean forceSync) throws IOException {
|
||||
SyncFuture syncFuture = publishSyncOnRingBuffer(forceSync);
|
||||
blockOnSync(syncFuture);
|
||||
}
|
||||
|
@ -823,7 +825,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.sync").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
publishSyncThenBlockOnCompletion(scope, forceSync);
|
||||
}
|
||||
}
|
||||
|
@ -839,7 +842,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
// Already sync'd.
|
||||
return;
|
||||
}
|
||||
try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.sync").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
publishSyncThenBlockOnCompletion(scope, forceSync);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -51,8 +50,8 @@ import java.util.Set;
|
|||
import java.util.TreeSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -119,12 +118,10 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
|||
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
|
@ -147,11 +144,12 @@ import org.apache.hadoop.mapred.JobConf;
|
|||
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||
import org.apache.hadoop.mapred.TaskLog;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.ZooKeeper.States;
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
/**
|
||||
* Facility for testing HBase. Replacement for
|
||||
|
@ -663,8 +661,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
|||
Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
|
||||
"ERROR");
|
||||
|
||||
TraceUtil.initTracer(conf);
|
||||
|
||||
this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
|
||||
true, null, racks, hosts, null);
|
||||
|
||||
|
@ -1172,7 +1168,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
|||
Log4jUtils.setLogLevel(org.apache.hadoop.hbase.ScheduledChore.class.getName(), "INFO");
|
||||
|
||||
Configuration c = new Configuration(this.conf);
|
||||
TraceUtil.initTracer(c);
|
||||
this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
|
||||
option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
|
||||
option.getMasterClass(), option.getRsClass());
|
||||
|
|
|
@ -165,8 +165,8 @@ public class TestExecutorService {
|
|||
private final AtomicBoolean lock;
|
||||
private AtomicInteger counter;
|
||||
|
||||
public TestEventHandler(Server server, EventType eventType,
|
||||
AtomicBoolean lock, AtomicInteger counter) {
|
||||
public TestEventHandler(Server server, EventType eventType, AtomicBoolean lock,
|
||||
AtomicInteger counter) {
|
||||
super(server, eventType);
|
||||
this.lock = lock;
|
||||
this.counter = counter;
|
||||
|
|
|
@ -1,134 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.trace;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.htrace.core.POJOSpanReceiver;
|
||||
import org.apache.htrace.core.Sampler;
|
||||
import org.apache.htrace.core.Span;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
|
||||
@Ignore // We don't support htrace in hbase-2.0.0 and this flakey is a little flakey.
|
||||
@Category({MiscTests.class, MediumTests.class})
|
||||
public class TestHTraceHooks {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHTraceHooks.class);
|
||||
|
||||
private static final byte[] FAMILY_BYTES = "family".getBytes();
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static POJOSpanReceiver rcvr;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void before() throws Exception {
|
||||
StartMiniClusterOption option = StartMiniClusterOption.builder()
|
||||
.numMasters(2).numRegionServers(3).numDataNodes(3).build();
|
||||
TEST_UTIL.startMiniCluster(option);
|
||||
rcvr = new POJOSpanReceiver(new HBaseHTraceConfiguration(TEST_UTIL.getConfiguration()));
|
||||
TraceUtil.addReceiver(rcvr);
|
||||
TraceUtil.addSampler(new Sampler() {
|
||||
@Override
|
||||
public boolean next() {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void after() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
TraceUtil.removeReceiver(rcvr);
|
||||
rcvr = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTraceCreateTable() throws Exception {
|
||||
Table table;
|
||||
Span createTableSpan;
|
||||
try (TraceScope scope = TraceUtil.createTrace("creating table")) {
|
||||
createTableSpan = scope.getSpan();
|
||||
table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY_BYTES);
|
||||
}
|
||||
|
||||
// Some table creation is async. Need to make sure that everything is full in before
|
||||
// checking to see if the spans are there.
|
||||
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() throws Exception {
|
||||
return (rcvr == null) ? true : rcvr.getSpans().size() >= 5;
|
||||
}
|
||||
});
|
||||
|
||||
Collection<Span> spans = Sets.newHashSet(rcvr.getSpans());
|
||||
List<Span> roots = new LinkedList<>();
|
||||
TraceTree traceTree = new TraceTree(spans);
|
||||
roots.addAll(traceTree.getSpansByParent().find(createTableSpan.getSpanId()));
|
||||
|
||||
// Roots was made 3 in hbase2. It used to be 1. We changed it back to 1 on upgrade to
|
||||
// htrace-4.2 just to get the test to pass (traces are not wholesome in hbase2; TODO).
|
||||
assertEquals(1, roots.size());
|
||||
assertEquals("creating table", createTableSpan.getDescription());
|
||||
|
||||
if (spans != null) {
|
||||
assertTrue(spans.size() > 5);
|
||||
}
|
||||
|
||||
Put put = new Put("row".getBytes());
|
||||
put.addColumn(FAMILY_BYTES, "col".getBytes(), "value".getBytes());
|
||||
|
||||
Span putSpan;
|
||||
|
||||
try (TraceScope scope = TraceUtil.createTrace("doing put")) {
|
||||
putSpan = scope.getSpan();
|
||||
table.put(put);
|
||||
}
|
||||
|
||||
spans = rcvr.getSpans();
|
||||
traceTree = new TraceTree(spans);
|
||||
roots.clear();
|
||||
roots.addAll(traceTree.getSpansByParent().find(putSpan.getSpanId()));
|
||||
assertEquals(1, roots.size());
|
||||
}
|
||||
}
|
|
@ -1,148 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.trace;
|
||||
|
||||
import org.apache.htrace.core.Span;
|
||||
import org.apache.htrace.core.SpanId;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
* Used to create the graph formed by spans.
|
||||
*/
|
||||
public class TraceTree {
|
||||
|
||||
public static class SpansByParent {
|
||||
private static Comparator<Span> COMPARATOR =
|
||||
new Comparator<Span>() {
|
||||
@Override
|
||||
public int compare(Span a, Span b) {
|
||||
return a.getSpanId().compareTo(b.getSpanId());
|
||||
}
|
||||
};
|
||||
|
||||
private final TreeSet<Span> treeSet;
|
||||
|
||||
private final HashMap<SpanId, LinkedList<Span>> parentToSpans;
|
||||
|
||||
SpansByParent(Collection<Span> spans) {
|
||||
TreeSet<Span> treeSet = new TreeSet<Span>(COMPARATOR);
|
||||
parentToSpans = new HashMap<SpanId, LinkedList<Span>>();
|
||||
for (Span span : spans) {
|
||||
treeSet.add(span);
|
||||
for (SpanId parent : span.getParents()) {
|
||||
LinkedList<Span> list = parentToSpans.get(parent);
|
||||
if (list == null) {
|
||||
list = new LinkedList<Span>();
|
||||
parentToSpans.put(parent, list);
|
||||
}
|
||||
list.add(span);
|
||||
}
|
||||
if (span.getParents().length == 0) {
|
||||
LinkedList<Span> list = parentToSpans.get(SpanId.INVALID);
|
||||
if (list == null) {
|
||||
list = new LinkedList<Span>();
|
||||
parentToSpans.put(SpanId.INVALID, list);
|
||||
}
|
||||
list.add(span);
|
||||
}
|
||||
}
|
||||
this.treeSet = treeSet;
|
||||
}
|
||||
|
||||
public List<Span> find(SpanId parentId) {
|
||||
LinkedList<Span> spans = parentToSpans.get(parentId);
|
||||
if (spans == null) {
|
||||
return new LinkedList<Span>();
|
||||
}
|
||||
return spans;
|
||||
}
|
||||
|
||||
public Iterator<Span> iterator() {
|
||||
return Collections.unmodifiableSortedSet(treeSet).iterator();
|
||||
}
|
||||
}
|
||||
|
||||
public static class SpansByProcessId {
|
||||
private static Comparator<Span> COMPARATOR =
|
||||
new Comparator<Span>() {
|
||||
@Override
|
||||
public int compare(Span a, Span b) {
|
||||
return a.getSpanId().compareTo(b.getSpanId());
|
||||
}
|
||||
};
|
||||
|
||||
private final TreeSet<Span> treeSet;
|
||||
|
||||
SpansByProcessId(Collection<Span> spans) {
|
||||
TreeSet<Span> treeSet = new TreeSet<Span>(COMPARATOR);
|
||||
for (Span span : spans) {
|
||||
treeSet.add(span);
|
||||
}
|
||||
this.treeSet = treeSet;
|
||||
}
|
||||
|
||||
public Iterator<Span> iterator() {
|
||||
return Collections.unmodifiableSortedSet(treeSet).iterator();
|
||||
}
|
||||
}
|
||||
|
||||
private final SpansByParent spansByParent;
|
||||
private final SpansByProcessId spansByProcessId;
|
||||
|
||||
/**
|
||||
* Create a new TraceTree
|
||||
*
|
||||
* @param spans The collection of spans to use to create this TraceTree. Should
|
||||
* have at least one root span.
|
||||
*/
|
||||
public TraceTree(Collection<Span> spans) {
|
||||
if (spans == null) {
|
||||
spans = Collections.emptySet();
|
||||
}
|
||||
this.spansByParent = new SpansByParent(spans);
|
||||
this.spansByProcessId = new SpansByProcessId(spans);
|
||||
}
|
||||
|
||||
public SpansByParent getSpansByParent() {
|
||||
return spansByParent;
|
||||
}
|
||||
|
||||
public SpansByProcessId getSpansByProcessId() {
|
||||
return spansByProcessId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder bld = new StringBuilder();
|
||||
String prefix = "";
|
||||
for (Iterator<Span> iter = spansByParent.iterator(); iter.hasNext();) {
|
||||
Span span = iter.next();
|
||||
bld.append(prefix).append(span.toString());
|
||||
prefix = "\n";
|
||||
}
|
||||
return bld.toString();
|
||||
}
|
||||
}
|
|
@ -25,6 +25,8 @@ import com.codahale.metrics.Histogram;
|
|||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricFilter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -58,8 +60,6 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
|
||||
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
|
@ -68,10 +68,6 @@ import org.apache.hadoop.hbase.util.Threads;
|
|||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.htrace.core.ProbabilitySampler;
|
||||
import org.apache.htrace.core.Sampler;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.htrace.core.Tracer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -129,12 +125,10 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
private final boolean noSync;
|
||||
private final HRegion region;
|
||||
private final int syncInterval;
|
||||
private final Sampler loopSampler;
|
||||
private final NavigableMap<byte[], Integer> scopes;
|
||||
|
||||
WALPutBenchmark(final HRegion region, final TableDescriptor htd,
|
||||
final long numIterations, final boolean noSync, final int syncInterval,
|
||||
final double traceFreq) {
|
||||
final long numIterations, final boolean noSync, final int syncInterval) {
|
||||
this.numIterations = numIterations;
|
||||
this.noSync = noSync;
|
||||
this.syncInterval = syncInterval;
|
||||
|
@ -144,24 +138,6 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
for(byte[] fam : htd.getColumnFamilyNames()) {
|
||||
scopes.put(fam, 0);
|
||||
}
|
||||
String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
|
||||
if (spanReceivers == null || spanReceivers.isEmpty()) {
|
||||
loopSampler = Sampler.NEVER;
|
||||
} else {
|
||||
if (traceFreq <= 0.0) {
|
||||
LOG.warn("Tracing enabled but traceFreq=0.");
|
||||
loopSampler = Sampler.NEVER;
|
||||
} else if (traceFreq >= 1.0) {
|
||||
loopSampler = Sampler.ALWAYS;
|
||||
if (numIterations > 1000) {
|
||||
LOG.warn("Full tracing of all iterations will produce a lot of data. Be sure your"
|
||||
+ " SpanReceiver can keep up.");
|
||||
}
|
||||
} else {
|
||||
getConf().setDouble("hbase.sampler.fraction", traceFreq);
|
||||
loopSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(getConf()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -170,13 +146,14 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
byte[] value = new byte[valueSize];
|
||||
Random rand = new Random(Thread.currentThread().getId());
|
||||
WAL wal = region.getWAL();
|
||||
|
||||
try (TraceScope threadScope = TraceUtil.createTrace("WALPerfEval." + Thread.currentThread().getName())) {
|
||||
Span threadSpan = TraceUtil.getGlobalTracer()
|
||||
.spanBuilder("WALPerfEval." + Thread.currentThread().getName()).startSpan();
|
||||
try (Scope threadScope = threadSpan.makeCurrent()) {
|
||||
int lastSync = 0;
|
||||
TraceUtil.addSampler(loopSampler);
|
||||
for (int i = 0; i < numIterations; ++i) {
|
||||
assert Tracer.getCurrentSpan() == threadScope.getSpan() : "Span leak detected.";
|
||||
try (TraceScope loopScope = TraceUtil.createTrace("runLoopIter" + i)) {
|
||||
assert Span.current() == threadSpan : "Span leak detected.";
|
||||
Span loopSpan = TraceUtil.getGlobalTracer().spanBuilder("runLoopIter" + i).startSpan();
|
||||
try (Scope loopScope = loopSpan.makeCurrent()) {
|
||||
long now = System.nanoTime();
|
||||
Put put = setupPut(rand, key, value, numFamilies);
|
||||
WALEdit walEdit = new WALEdit();
|
||||
|
@ -192,10 +169,14 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
latencyHistogram.update(System.nanoTime() - now);
|
||||
} finally {
|
||||
loopSpan.end();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error(getClass().getSimpleName() + " Thread failed", e);
|
||||
} finally {
|
||||
threadSpan.end();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -216,9 +197,6 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
boolean compress = false;
|
||||
String cipher = null;
|
||||
int numRegions = 1;
|
||||
String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
|
||||
boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
|
||||
double traceFreq = 1.0;
|
||||
// Process command line args
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
String cmd = args[i];
|
||||
|
@ -258,7 +236,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
} else if (cmd.equals("-regions")) {
|
||||
numRegions = Integer.parseInt(args[++i]);
|
||||
} else if (cmd.equals("-traceFreq")) {
|
||||
traceFreq = Double.parseDouble(args[++i]);
|
||||
// keep it here for compatible
|
||||
System.err.println("-traceFreq is not supported any more");
|
||||
} else if (cmd.equals("-h")) {
|
||||
printUsageAndExit();
|
||||
} else if (cmd.equals("--help")) {
|
||||
|
@ -307,13 +286,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf()));
|
||||
FileSystem fs = FileSystem.get(getConf());
|
||||
LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir);
|
||||
|
||||
SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null;
|
||||
final Sampler sampler = trace ? Sampler.ALWAYS : Sampler.NEVER;
|
||||
TraceUtil.addSampler(sampler);
|
||||
TraceScope scope = TraceUtil.createTrace("WALPerfEval");
|
||||
|
||||
try {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("WALPerfEval").startSpan();
|
||||
try (Scope scope = span.makeCurrent()){
|
||||
rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||
cleanRegionRootDir(fs, rootRegionDir);
|
||||
CommonFSUtils.setRootDir(getConf(), rootRegionDir);
|
||||
|
@ -330,8 +304,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
// a table per desired region means we can avoid carving up the key space
|
||||
final TableDescriptor htd = createHTableDescriptor(i, numFamilies);
|
||||
regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller);
|
||||
benchmarks[i] = TraceUtil.wrap(new WALPutBenchmark(regions[i], htd, numIterations, noSync,
|
||||
syncInterval, traceFreq), "");
|
||||
benchmarks[i] =
|
||||
new WALPutBenchmark(regions[i], htd, numIterations, noSync, syncInterval);
|
||||
}
|
||||
ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).
|
||||
outputTo(System.out).convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build();
|
||||
|
@ -380,19 +354,14 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
// We may be called inside a test that wants to keep on using the fs.
|
||||
if (!noclosefs) {
|
||||
fs.close();
|
||||
}
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
if (receiverHost != null) {
|
||||
receiverHost.closeReceivers();
|
||||
}
|
||||
}
|
||||
|
||||
return(0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
private static TableDescriptor createHTableDescriptor(final int regionNum,
|
||||
|
|
|
@ -79,6 +79,7 @@
|
|||
<exclude>log4j:*</exclude>
|
||||
<exclude>commons-logging:*</exclude>
|
||||
<exclude>org.javassist:*</exclude>
|
||||
<exclude>io.opentelemetry:*</exclude>
|
||||
</excludes>
|
||||
</artifactSet>
|
||||
</configuration>
|
||||
|
|
|
@ -236,6 +236,7 @@
|
|||
<exclude>log4j:*</exclude>
|
||||
<exclude>commons-logging:*</exclude>
|
||||
<exclude>org.javassist:*</exclude>
|
||||
<exclude>io.opentelemetry:*</exclude>
|
||||
</excludes>
|
||||
</artifactSet>
|
||||
</configuration>
|
||||
|
|
|
@ -159,6 +159,7 @@
|
|||
<exclude>log4j:*</exclude>
|
||||
<exclude>commons-logging:*</exclude>
|
||||
<exclude>org.javassist:*</exclude>
|
||||
<exclude>io.opentelemetry:*</exclude>
|
||||
</excludes>
|
||||
</artifactSet>
|
||||
<relocations>
|
||||
|
|
|
@ -17,16 +17,17 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
java_import org.apache.hadoop.hbase.trace.SpanReceiverHost
|
||||
# Disable tracing for now as HTrace does not work any more
|
||||
# java_import org.apache.hadoop.hbase.trace.SpanReceiverHost
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class Trace < Command
|
||||
@@conf = org.apache.htrace.core.HTraceConfiguration.fromKeyValuePairs(
|
||||
'sampler.classes', 'org.apache.htrace.core.AlwaysSampler'
|
||||
)
|
||||
@@tracer = org.apache.htrace.core.Tracer::Builder.new('HBaseShell').conf(@@conf).build()
|
||||
@@tracescope = nil
|
||||
# @@conf = org.apache.htrace.core.HTraceConfiguration.fromKeyValuePairs(
|
||||
# 'sampler.classes', 'org.apache.htrace.core.AlwaysSampler'
|
||||
# )
|
||||
# @@tracer = org.apache.htrace.core.Tracer::Builder.new('HBaseShell').conf(@@conf).build()
|
||||
# @@tracescope = nil
|
||||
|
||||
def help
|
||||
<<-EOF
|
||||
|
@ -57,23 +58,23 @@ EOF
|
|||
end
|
||||
|
||||
def trace(startstop, spanname)
|
||||
@@receiver ||= SpanReceiverHost.getInstance(@shell.hbase.configuration)
|
||||
if startstop == 'start'
|
||||
unless tracing?
|
||||
@@tracescope = @@tracer.newScope(spanname)
|
||||
end
|
||||
elsif startstop == 'stop'
|
||||
if tracing?
|
||||
@@tracescope.close
|
||||
@@tracescope = nil
|
||||
end
|
||||
end
|
||||
tracing?
|
||||
# @@receiver ||= SpanReceiverHost.getInstance(@shell.hbase.configuration)
|
||||
# if startstop == 'start'
|
||||
# unless tracing?
|
||||
# @@tracescope = @@tracer.newScope(spanname)
|
||||
# end
|
||||
# elsif startstop == 'stop'
|
||||
# if tracing?
|
||||
# @@tracescope.close
|
||||
# @@tracescope = nil
|
||||
# end
|
||||
# end
|
||||
# tracing?
|
||||
end
|
||||
|
||||
def tracing?
|
||||
@@tracescope != nil
|
||||
end
|
||||
# def tracing?
|
||||
# @@tracescope != nil
|
||||
# end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -148,6 +148,10 @@
|
|||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
</dependency>
|
||||
<!-- Test dependencies -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
|
|
|
@ -18,18 +18,18 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.AsyncCallback;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
|
@ -164,7 +164,8 @@ public class RecoverableZooKeeper {
|
|||
* exist.
|
||||
*/
|
||||
public void delete(String path, int version) throws InterruptedException, KeeperException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.delete").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
boolean isRetry = false; // False for first attempt, true for all retries.
|
||||
while (true) {
|
||||
|
@ -196,6 +197,8 @@ public class RecoverableZooKeeper {
|
|||
retryCounter.sleepUntilNextRetry();
|
||||
isRetry = true;
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -204,7 +207,8 @@ public class RecoverableZooKeeper {
|
|||
* @return A Stat instance
|
||||
*/
|
||||
public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -225,6 +229,8 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -233,7 +239,9 @@ public class RecoverableZooKeeper {
|
|||
* @return A Stat instance
|
||||
*/
|
||||
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
|
||||
Span span =
|
||||
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -255,6 +263,8 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -273,7 +283,9 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public List<String> getChildren(String path, Watcher watcher)
|
||||
throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
|
||||
Span span =
|
||||
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -294,6 +306,8 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -303,7 +317,9 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public List<String> getChildren(String path, boolean watch)
|
||||
throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
|
||||
Span span =
|
||||
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -325,6 +341,8 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -334,7 +352,8 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public byte[] getData(String path, Watcher watcher, Stat stat)
|
||||
throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -355,6 +374,8 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -364,7 +385,9 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public byte[] getData(String path, boolean watch, Stat stat)
|
||||
throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
|
||||
Span span =
|
||||
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -386,6 +409,8 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -397,7 +422,8 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public Stat setData(String path, byte[] data, int version)
|
||||
throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setData").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
byte[] newData = ZKMetadata.appendMetaData(id, data);
|
||||
boolean isRetry = false;
|
||||
|
@ -437,6 +463,8 @@ public class RecoverableZooKeeper {
|
|||
retryCounter.sleepUntilNextRetry();
|
||||
isRetry = true;
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -444,9 +472,9 @@ public class RecoverableZooKeeper {
|
|||
* getAcl is an idempotent operation. Retry before throwing exception
|
||||
* @return list of ACLs
|
||||
*/
|
||||
public List<ACL> getAcl(String path, Stat stat)
|
||||
throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
|
||||
public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getAcl").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -467,6 +495,8 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -476,7 +506,8 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public Stat setAcl(String path, List<ACL> acls, int version)
|
||||
throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setAcl").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -496,6 +527,8 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -514,10 +547,10 @@ public class RecoverableZooKeeper {
|
|||
*
|
||||
* @return Path
|
||||
*/
|
||||
public String create(String path, byte[] data, List<ACL> acl,
|
||||
CreateMode createMode)
|
||||
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
|
||||
throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.create").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
byte[] newData = ZKMetadata.appendMetaData(id, data);
|
||||
switch (createMode) {
|
||||
case EPHEMERAL:
|
||||
|
@ -532,6 +565,8 @@ public class RecoverableZooKeeper {
|
|||
throw new IllegalArgumentException("Unrecognized CreateMode: " +
|
||||
createMode);
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -647,9 +682,9 @@ public class RecoverableZooKeeper {
|
|||
/**
|
||||
* Run multiple operations in a transactional manner. Retry before throwing exception
|
||||
*/
|
||||
public List<OpResult> multi(Iterable<Op> ops)
|
||||
throws KeeperException, InterruptedException {
|
||||
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
|
||||
public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.multi").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
Iterable<Op> multiOps = prepareZKMulti(ops);
|
||||
while (true) {
|
||||
|
@ -671,6 +706,8 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
30
pom.xml
30
pom.xml
|
@ -1026,6 +1026,25 @@
|
|||
</rules>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>banned-htrace</id>
|
||||
<goals>
|
||||
<goal>enforce</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<rules>
|
||||
<bannedDependencies>
|
||||
<excludes>
|
||||
<exclude>org.apache.htrace:**</exclude>
|
||||
</excludes>
|
||||
<message>
|
||||
Use OpenTelemetry instead
|
||||
</message>
|
||||
<searchTransitive>false</searchTransitive>
|
||||
</bannedDependencies>
|
||||
</rules>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>check-aggregate-license</id>
|
||||
<!-- must check after LICENSE is built at 'generate-resources' -->
|
||||
|
@ -1135,9 +1154,10 @@
|
|||
<restrictImports implementation="de.skuzzle.enforcer.restrictimports.rule.RestrictImports">
|
||||
<includeTestCode>true</includeTestCode>
|
||||
<commentLineBufferSize>512</commentLineBufferSize>
|
||||
<reason>Do not use htrace v3</reason>
|
||||
<reason>Do not use htrace</reason>
|
||||
<bannedImports>
|
||||
<bannedImport>org.htrace.**</bannedImport>
|
||||
<bannedImport>org.apache.htrace.**</bannedImport>
|
||||
</bannedImports>
|
||||
</restrictImports>
|
||||
<restrictImports implementation="de.skuzzle.enforcer.restrictimports.rule.RestrictImports">
|
||||
|
@ -1463,7 +1483,7 @@
|
|||
<jruby.version>9.2.13.0</jruby.version>
|
||||
<junit.version>4.13</junit.version>
|
||||
<hamcrest.version>1.3</hamcrest.version>
|
||||
<htrace.version>4.2.0-incubating</htrace.version>
|
||||
<opentelemetry.version>0.12.0</opentelemetry.version>
|
||||
<log4j.version>1.2.17</log4j.version>
|
||||
<mockito-core.version>2.28.2</mockito-core.version>
|
||||
<!--Internally we use a different version of protobuf. See hbase-protocol-shaded-->
|
||||
|
@ -2155,9 +2175,9 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.htrace</groupId>
|
||||
<artifactId>htrace-core4</artifactId>
|
||||
<version>${htrace.version}</version>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
<version>${opentelemetry.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.lmax</groupId>
|
||||
|
|
Loading…
Reference in New Issue