From ac7622c39822ef6be9c81c6762eda13a5f2a4eef Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Mon, 13 Jun 2022 10:33:49 +0200 Subject: [PATCH] HBASE-26366 Provide meaningful parent spans to ZK interactions Signed-off-by: Andrew Purtell --- .../trace/hamcrest/SpanDataMatchers.java | 22 +- .../org/apache/hadoop/hbase/ChoreService.java | 6 +- .../apache/hadoop/hbase/trace/TraceUtil.java | 31 ++ .../hadoop/hbase/MetaRegionLocationCache.java | 67 ++-- .../apache/hadoop/hbase/master/HMaster.java | 101 +++--- .../hbase/master/HMasterCommandLine.java | 15 +- .../hbase/master/RegionServerTracker.java | 40 ++- .../hbase/regionserver/HRegionServer.java | 324 ++++++++++-------- .../HRegionServerCommandLine.java | 11 +- .../hbase/TestServerInternalsTracing.java | 297 ++++++++++++++++ .../hbase/zookeeper/MasterAddressTracker.java | 13 +- .../hbase/zookeeper/RecoverableZooKeeper.java | 94 +++-- .../hadoop/hbase/zookeeper/ZKWatcher.java | 68 ++-- 13 files changed, 771 insertions(+), 318 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerInternalsTracing.java diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java index 6d0468c32ed..d021f4d3aaf 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java @@ -26,7 +26,6 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.data.EventData; import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.time.Duration; import java.util.Objects; @@ -144,23 +143,20 @@ public final class SpanDataMatchers { }; } - public static Matcher hasStatusWithCode(StatusCode statusCode) { - final Matcher matcher = is(equalTo(statusCode)); - return new TypeSafeMatcher() { + public static Matcher hasStatusWithCode(Matcher matcher) { + return new FeatureMatcher(matcher, "SpanData with StatusCode that", + "statusWithCode") { @Override - protected boolean matchesSafely(SpanData item) { - final StatusData statusData = item.getStatus(); - return statusData != null && statusData.getStatusCode() != null - && matcher.matches(statusData.getStatusCode()); - } - - @Override - public void describeTo(Description description) { - description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher); + protected StatusCode featureValueOf(SpanData actual) { + return actual.getStatus().getStatusCode(); } }; } + public static Matcher hasStatusWithCode(StatusCode statusCode) { + return hasStatusWithCode(is(equalTo(statusCode))); + } + public static Matcher hasTraceId(String traceId) { return hasTraceId(is(equalTo(traceId))); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java index 939d75fd729..1bba8d49120 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,8 +164,9 @@ public class ChoreService { chore.getChoreService().cancelChore(chore); } chore.setChoreService(this); - ScheduledFuture future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), - chore.getPeriod(), chore.getTimeUnit()); + ScheduledFuture future = + scheduler.scheduleAtFixedRate(TraceUtil.tracedRunnable(chore, chore.getName()), + chore.getInitialDelay(), chore.getPeriod(), chore.getTimeUnit()); scheduledChores.put(chore, future); return true; } catch (Exception e) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index 7dc24a54ab6..5b1fb86a351 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -135,6 +135,31 @@ public final class TraceUtil { }); } + /** + * Wrap the provided {@code runnable} in a {@link Runnable} that is traced. + */ + public static Runnable tracedRunnable(final Runnable runnable, final String spanName) { + return tracedRunnable(runnable, () -> createSpan(spanName)); + } + + /** + * Wrap the provided {@code runnable} in a {@link Runnable} that is traced. + */ + public static Runnable tracedRunnable(final Runnable runnable, + final Supplier spanSupplier) { + // N.B. This method name follows the convention of this class, i.e., tracedFuture, rather than + // the convention of the OpenTelemetry classes, i.e., Context#wrap. + return () -> { + final Span span = spanSupplier.get(); + try (final Scope ignored = span.makeCurrent()) { + runnable.run(); + span.setStatus(StatusCode.OK); + } finally { + span.end(); + } + }; + } + /** * A {@link Runnable} that may also throw. * @param the type of {@link Throwable} that can be produced. @@ -144,11 +169,17 @@ public final class TraceUtil { void run() throws T; } + /** + * Trace the execution of {@code runnable}. + */ public static void trace(final ThrowingRunnable runnable, final String spanName) throws T { trace(runnable, () -> createSpan(spanName)); } + /** + * Trace the execution of {@code runnable}. + */ public static void trace(final ThrowingRunnable runnable, final Supplier spanSupplier) throws T { Span span = spanSupplier.get(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java index 22edcbfdfa9..c1ebb6fb9b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ThreadFactory; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; @@ -101,41 +102,43 @@ public class MetaRegionLocationCache extends ZKListener { * @param retryCounter controls the number of retries and sleep between retries. */ private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) { - List znodes = null; - while (retryCounter.shouldRetry()) { - try { - znodes = watcher.getMetaReplicaNodesAndWatchChildren(); - break; - } catch (KeeperException ke) { - LOG.debug("Error populating initial meta locations", ke); - if (!retryCounter.shouldRetry()) { - // Retries exhausted and watchers not set. This is not a desirable state since the cache - // could remain stale forever. Propagate the exception. - watcher.abort("Error populating meta locations", ke); - return; - } + TraceUtil.trace(() -> { + List znodes = null; + while (retryCounter.shouldRetry()) { try { - retryCounter.sleepUntilNextRetry(); - } catch (InterruptedException ie) { - LOG.error("Interrupted while loading meta locations from ZK", ie); - Thread.currentThread().interrupt(); - return; + znodes = watcher.getMetaReplicaNodesAndWatchChildren(); + break; + } catch (KeeperException ke) { + LOG.debug("Error populating initial meta locations", ke); + if (!retryCounter.shouldRetry()) { + // Retries exhausted and watchers not set. This is not a desirable state since the cache + // could remain stale forever. Propagate the exception. + watcher.abort("Error populating meta locations", ke); + return; + } + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException ie) { + LOG.error("Interrupted while loading meta locations from ZK", ie); + Thread.currentThread().interrupt(); + return; + } } } - } - if (znodes == null || znodes.isEmpty()) { - // No meta znodes exist at this point but we registered a watcher on the base znode to listen - // for updates. They will be handled via nodeChildrenChanged(). - return; - } - if (znodes.size() == cachedMetaLocations.size()) { - // No new meta znodes got added. - return; - } - for (String znode : znodes) { - String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode); - updateMetaLocation(path, opType); - } + if (znodes == null || znodes.isEmpty()) { + // No meta znodes exist at this point but we registered a watcher on the base znode to + // listen for updates. They will be handled via nodeChildrenChanged(). + return; + } + if (znodes.size() == cachedMetaLocations.size()) { + // No new meta znodes got added. + return; + } + for (String znode : znodes) { + String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode); + updateMetaLocation(path, opType); + } + }, "MetaRegionLocationCache.loadMetaLocationsFromZk"); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index dae8fdad0e2..e793fe844fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -26,6 +26,9 @@ import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY; import com.google.errorprone.annotations.RestrictedApi; import com.google.protobuf.Descriptors; import com.google.protobuf.Service; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Constructor; @@ -213,6 +216,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -452,7 +456,8 @@ public class HMaster extends HRegionServer implements MasterServices { */ public HMaster(final Configuration conf) throws IOException { super(conf); - try { + final Span span = TraceUtil.createSpan("HMaster.cxtor"); + try (Scope ignored = span.makeCurrent()) { if (conf.getBoolean(MAINTENANCE_MODE, false)) { LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE); maintenanceMode = true; @@ -513,11 +518,15 @@ public class HMaster extends HRegionServer implements MasterServices { cachedClusterId = new CachedClusterId(this, conf); this.regionServerTracker = new RegionServerTracker(zooKeeper, this); + span.setStatus(StatusCode.OK); } catch (Throwable t) { // Make sure we log the exception. HMaster is often started via reflection and the // cause of failed startup is lost. + TraceUtil.setError(span, t); LOG.error("Failed construction of Master", t); throw t; + } finally { + span.end(); } } @@ -540,7 +549,7 @@ public class HMaster extends HRegionServer implements MasterServices { @Override public void run() { try { - Threads.setDaemonThreadRunning(new Thread(() -> { + Threads.setDaemonThreadRunning(new Thread(() -> TraceUtil.trace(() -> { try { int infoPort = putUpJettyServer(); startActiveMasterManager(infoPort); @@ -553,23 +562,29 @@ public class HMaster extends HRegionServer implements MasterServices { abort(error, t); } } - }), getName() + ":becomeActiveMaster"); + }, "HMaster.becomeActiveMaster")), getName() + ":becomeActiveMaster"); // Fall in here even if we have been aborted. Need to run the shutdown services and // the super run call will do this for us. super.run(); } finally { - if (this.clusterSchemaService != null) { - // If on way out, then we are no longer active master. - this.clusterSchemaService.stopAsync(); - try { - this.clusterSchemaService - .awaitTerminated(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS, - DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS); - } catch (TimeoutException te) { - LOG.warn("Failed shutdown of clusterSchemaService", te); + final Span span = TraceUtil.createSpan("HMaster exiting main loop"); + try (Scope ignored = span.makeCurrent()) { + if (this.clusterSchemaService != null) { + // If on way out, then we are no longer active master. + this.clusterSchemaService.stopAsync(); + try { + this.clusterSchemaService + .awaitTerminated(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS, + DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS); + } catch (TimeoutException te) { + LOG.warn("Failed shutdown of clusterSchemaService", te); + } } + this.activeMaster = false; + span.setStatus(StatusCode.OK); + } finally { + span.end(); } - this.activeMaster = false; } } @@ -3094,36 +3109,38 @@ public class HMaster extends HRegionServer implements MasterServices { * Shutdown the cluster. Master runs a coordinated stop of all RegionServers and then itself. */ public void shutdown() throws IOException { - if (cpHost != null) { - cpHost.preShutdown(); - } - - // Tell the servermanager cluster shutdown has been called. This makes it so when Master is - // last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting - // the cluster status as down. RegionServers will notice this change in state and will start - // shutting themselves down. When last has exited, Master can go down. - if (this.serverManager != null) { - this.serverManager.shutdownCluster(); - } - if (this.clusterStatusTracker != null) { - try { - this.clusterStatusTracker.setClusterDown(); - } catch (KeeperException e) { - LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e); + TraceUtil.trace(() -> { + if (cpHost != null) { + cpHost.preShutdown(); } - } - // Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc., - // processing so we can go down. - if (this.procedureExecutor != null) { - this.procedureExecutor.stop(); - } - // Shutdown our cluster connection. This will kill any hosted RPCs that might be going on; - // this is what we want especially if the Master is in startup phase doing call outs to - // hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on - // the rpc to timeout. - if (this.clusterConnection != null) { - this.clusterConnection.close(); - } + + // Tell the servermanager cluster shutdown has been called. This makes it so when Master is + // last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting + // the cluster status as down. RegionServers will notice this change in state and will start + // shutting themselves down. When last has exited, Master can go down. + if (this.serverManager != null) { + this.serverManager.shutdownCluster(); + } + if (this.clusterStatusTracker != null) { + try { + this.clusterStatusTracker.setClusterDown(); + } catch (KeeperException e) { + LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e); + } + } + // Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc., + // processing so we can go down. + if (this.procedureExecutor != null) { + this.procedureExecutor.stop(); + } + // Shutdown our cluster connection. This will kill any hosted RPCs that might be going on; + // this is what we want especially if the Master is in startup phase doing call outs to + // hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on + // the rpc to timeout. + if (this.clusterConnection != null) { + this.clusterConnection.close(); + } + }, "HMaster.shutdown"); } public void stopMaster() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java index efded4841d7..8ac3a1ebba9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.master; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.File; import java.io.IOException; import java.util.List; @@ -30,13 +33,13 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.ServerCommandLine; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKAuthentication; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,7 +164,8 @@ public class HMasterCommandLine extends ServerCommandLine { private int startMaster() { Configuration conf = getConf(); - try { + final Span span = TraceUtil.createSpan("HMasterCommandLine.startMaster"); + try (Scope ignored = span.makeCurrent()) { // If 'local', defer to LocalHBaseCluster instance. Starts master // and regionserver both in the one JVM. if (LocalHBaseCluster.isLocal(conf)) { @@ -250,9 +254,13 @@ public class HMasterCommandLine extends ServerCommandLine { master.join(); if (master.isAborted()) throw new RuntimeException("HMaster Aborted"); } + span.setStatus(StatusCode.OK); } catch (Throwable t) { + TraceUtil.setError(span, t); LOG.error("Master exiting", t); return 1; + } finally { + span.end(); } return 0; } @@ -310,8 +318,7 @@ public class HMasterCommandLine extends ServerCommandLine { public static class LocalHMaster extends HMaster { private MiniZooKeeperCluster zkcluster = null; - public LocalHMaster(Configuration conf) - throws IOException, KeeperException, InterruptedException { + public LocalHMaster(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java index 283da5fc0c4..5ecf6a2f6e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.master; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.io.InterruptedIOException; import java.util.Collections; @@ -29,6 +32,7 @@ import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.VersionInfoUtil; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -184,22 +188,28 @@ public class RegionServerTracker extends ZKListener { private synchronized void refresh() { List names; - try { - names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); - } catch (KeeperException e) { - // here we need to abort as we failed to set watcher on the rs node which means that we can - // not track the node deleted event any more. - server.abort("Unexpected zk exception getting RS nodes", e); - return; + final Span span = TraceUtil.createSpan("RegionServerTracker.refresh"); + try (final Scope ignored = span.makeCurrent()) { + try { + names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); + } catch (KeeperException e) { + // here we need to abort as we failed to set watcher on the rs node which means that we can + // not track the node deleted event any more. + server.abort("Unexpected zk exception getting RS nodes", e); + return; + } + Set newServers = CollectionUtils.isEmpty(names) + ? Collections.emptySet() + : names.stream().map(ServerName::parseServerName) + .collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet)); + if (active) { + processAsActiveMaster(newServers); + } + this.regionServers = newServers; + span.setStatus(StatusCode.OK); + } finally { + span.end(); } - Set newServers = CollectionUtils.isEmpty(names) - ? Collections.emptySet() - : names.stream().map(ServerName::parseServerName) - .collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet)); - if (active) { - processAsActiveMaster(newServers); - } - this.regionServers = newServers; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 97095a8abe4..b5a9aa48e09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -23,6 +23,9 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.io.PrintWriter; import java.lang.management.MemoryType; @@ -161,6 +164,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -589,7 +593,8 @@ public class HRegionServer extends Thread */ public HRegionServer(final Configuration conf) throws IOException { super("RegionServer"); // thread name - try { + final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); + try (Scope ignored = span.makeCurrent()) { this.startcode = EnvironmentEdgeManager.currentTime(); this.conf = conf; this.dataFsOk = true; @@ -701,11 +706,15 @@ public class HRegionServer extends Thread this.choreService = new ChoreService(getName(), true); this.executorService = new ExecutorService(getName()); putUpWebUI(); + span.setStatus(StatusCode.OK); } catch (Throwable t) { // Make sure we log the exception. HRegionServer is often started via reflection and the // cause of failed startup is lost. + TraceUtil.setError(span, t); LOG.error("Failed construction RegionServer", t); throw t; + } finally { + span.end(); } } @@ -920,18 +929,23 @@ public class HRegionServer extends Thread * In here we just put up the RpcServer, setup Connection, and ZooKeeper. */ private void preRegistrationInitialization() { - try { + final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); + try (Scope ignored = span.makeCurrent()) { initializeZooKeeper(); setupClusterConnection(); // Setup RPC client for master communication this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); + span.setStatus(StatusCode.OK); } catch (Throwable t) { // Call stop if error or process will stick around for ever since server // puts up non-daemon threads. + TraceUtil.setError(span, t); this.rpcServices.stop(); abort("Initialization of RS failed. Hence aborting RS.", t); + } finally { + span.end(); } } @@ -1042,35 +1056,39 @@ public class HRegionServer extends Thread // start up all Services. Use RetryCounter to get backoff in case Master is struggling to // come up. LOG.debug("About to register with Master."); - RetryCounterFactory rcf = - new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); - RetryCounter rc = rcf.create(); - while (keepLooping()) { - RegionServerStartupResponse w = reportForDuty(); - if (w == null) { - long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); - LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); - this.sleeper.sleep(sleepTime); - } else { - handleReportForDutyResponse(w); - break; + TraceUtil.trace(() -> { + RetryCounterFactory rcf = + new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); + RetryCounter rc = rcf.create(); + while (keepLooping()) { + RegionServerStartupResponse w = reportForDuty(); + if (w == null) { + long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); + LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); + this.sleeper.sleep(sleepTime); + } else { + handleReportForDutyResponse(w); + break; + } } - } + }, "HRegionServer.registerWithMaster"); } if (!isStopped() && isHealthy()) { - // start the snapshot handler and other procedure handlers, - // since the server is ready to run - if (this.rspmHost != null) { - this.rspmHost.start(); - } - // Start the Quota Manager - if (this.rsQuotaManager != null) { - rsQuotaManager.start(getRpcServer().getScheduler()); - } - if (this.rsSpaceQuotaManager != null) { - this.rsSpaceQuotaManager.start(); - } + TraceUtil.trace(() -> { + // start the snapshot handler and other procedure handlers, + // since the server is ready to run + if (this.rspmHost != null) { + this.rspmHost.start(); + } + // Start the Quota Manager + if (this.rsQuotaManager != null) { + rsQuotaManager.start(getRpcServer().getScheduler()); + } + if (this.rsSpaceQuotaManager != null) { + this.rsSpaceQuotaManager.start(); + } + }, "HRegionServer.startup"); } // We registered with the Master. Go into run mode. @@ -1121,138 +1139,144 @@ public class HRegionServer extends Thread } } - if (this.leaseManager != null) { - this.leaseManager.closeAfterLeasesExpire(); - } - if (this.splitLogWorker != null) { - splitLogWorker.stop(); - } - if (this.infoServer != null) { - LOG.info("Stopping infoServer"); - try { - this.infoServer.stop(); - } catch (Exception e) { - LOG.error("Failed to stop infoServer", e); + final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); + try (Scope ignored = span.makeCurrent()) { + if (this.leaseManager != null) { + this.leaseManager.closeAfterLeasesExpire(); } - } - // Send cache a shutdown. - if (blockCache != null) { - blockCache.shutdown(); - } - if (mobFileCache != null) { - mobFileCache.shutdown(); - } - - // Send interrupts to wake up threads if sleeping so they notice shutdown. - // TODO: Should we check they are alive? If OOME could have exited already - if (this.hMemManager != null) { - this.hMemManager.stop(); - } - if (this.cacheFlusher != null) { - this.cacheFlusher.interruptIfNecessary(); - } - if (this.compactSplitThread != null) { - this.compactSplitThread.interruptIfNecessary(); - } - - // Stop the snapshot and other procedure handlers, forcefully killing all running tasks - if (rspmHost != null) { - rspmHost.stop(this.abortRequested.get() || this.killed); - } - - if (this.killed) { - // Just skip out w/o closing regions. Used when testing. - } else if (abortRequested.get()) { - if (this.dataFsOk) { - closeUserRegions(abortRequested.get()); // Don't leave any open file handles + if (this.splitLogWorker != null) { + splitLogWorker.stop(); } - LOG.info("aborting server " + this.serverName); - } else { - closeUserRegions(abortRequested.get()); - LOG.info("stopping server " + this.serverName); - } - - if (this.clusterConnection != null && !clusterConnection.isClosed()) { - try { - this.clusterConnection.close(); - } catch (IOException e) { - // Although the {@link Closeable} interface throws an {@link - // IOException}, in reality, the implementation would never do that. - LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e); - } - } - - // Closing the compactSplit thread before closing meta regions - if (!this.killed && containsMetaTableRegions()) { - if (!abortRequested.get() || this.dataFsOk) { - if (this.compactSplitThread != null) { - this.compactSplitThread.join(); - this.compactSplitThread = null; + if (this.infoServer != null) { + LOG.info("Stopping infoServer"); + try { + this.infoServer.stop(); + } catch (Exception e) { + LOG.error("Failed to stop infoServer", e); } - closeMetaTableRegions(abortRequested.get()); } - } + // Send cache a shutdown. + if (blockCache != null) { + blockCache.shutdown(); + } + if (mobFileCache != null) { + mobFileCache.shutdown(); + } - if (!this.killed && this.dataFsOk) { - waitOnAllRegionsToClose(abortRequested.get()); - LOG.info("stopping server " + this.serverName + "; all regions closed."); - } + // Send interrupts to wake up threads if sleeping so they notice shutdown. + // TODO: Should we check they are alive? If OOME could have exited already + if (this.hMemManager != null) { + this.hMemManager.stop(); + } + if (this.cacheFlusher != null) { + this.cacheFlusher.interruptIfNecessary(); + } + if (this.compactSplitThread != null) { + this.compactSplitThread.interruptIfNecessary(); + } - // Stop the quota manager - if (rsQuotaManager != null) { - rsQuotaManager.stop(); - } - if (rsSpaceQuotaManager != null) { - rsSpaceQuotaManager.stop(); - rsSpaceQuotaManager = null; - } + // Stop the snapshot and other procedure handlers, forcefully killing all running tasks + if (rspmHost != null) { + rspmHost.stop(this.abortRequested.get() || this.killed); + } - // flag may be changed when closing regions throws exception. - if (this.dataFsOk) { - shutdownWAL(!abortRequested.get()); - } + if (this.killed) { + // Just skip out w/o closing regions. Used when testing. + } else if (abortRequested.get()) { + if (this.dataFsOk) { + closeUserRegions(abortRequested.get()); // Don't leave any open file handles + } + LOG.info("aborting server " + this.serverName); + } else { + closeUserRegions(abortRequested.get()); + LOG.info("stopping server " + this.serverName); + } - // Make sure the proxy is down. - if (this.rssStub != null) { - this.rssStub = null; - } - if (this.lockStub != null) { - this.lockStub = null; - } - if (this.rpcClient != null) { - this.rpcClient.close(); - } - if (this.leaseManager != null) { - this.leaseManager.close(); - } - if (this.pauseMonitor != null) { - this.pauseMonitor.stop(); - } + if (this.clusterConnection != null && !clusterConnection.isClosed()) { + try { + this.clusterConnection.close(); + } catch (IOException e) { + // Although the {@link Closeable} interface throws an {@link + // IOException}, in reality, the implementation would never do that. + LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e); + } + } - if (!killed) { - stopServiceThreads(); - } + // Closing the compactSplit thread before closing meta regions + if (!this.killed && containsMetaTableRegions()) { + if (!abortRequested.get() || this.dataFsOk) { + if (this.compactSplitThread != null) { + this.compactSplitThread.join(); + this.compactSplitThread = null; + } + closeMetaTableRegions(abortRequested.get()); + } + } - if (this.rpcServices != null) { - this.rpcServices.stop(); - } + if (!this.killed && this.dataFsOk) { + waitOnAllRegionsToClose(abortRequested.get()); + LOG.info("stopping server " + this.serverName + "; all regions closed."); + } - try { - deleteMyEphemeralNode(); - } catch (KeeperException.NoNodeException nn) { - // pass - } catch (KeeperException e) { - LOG.warn("Failed deleting my ephemeral node", e); - } - // We may have failed to delete the znode at the previous step, but - // we delete the file anyway: a second attempt to delete the znode is likely to fail again. - ZNodeClearer.deleteMyEphemeralNodeOnDisk(); + // Stop the quota manager + if (rsQuotaManager != null) { + rsQuotaManager.stop(); + } + if (rsSpaceQuotaManager != null) { + rsSpaceQuotaManager.stop(); + rsSpaceQuotaManager = null; + } - if (this.zooKeeper != null) { - this.zooKeeper.close(); + // flag may be changed when closing regions throws exception. + if (this.dataFsOk) { + shutdownWAL(!abortRequested.get()); + } + + // Make sure the proxy is down. + if (this.rssStub != null) { + this.rssStub = null; + } + if (this.lockStub != null) { + this.lockStub = null; + } + if (this.rpcClient != null) { + this.rpcClient.close(); + } + if (this.leaseManager != null) { + this.leaseManager.close(); + } + if (this.pauseMonitor != null) { + this.pauseMonitor.stop(); + } + + if (!killed) { + stopServiceThreads(); + } + + if (this.rpcServices != null) { + this.rpcServices.stop(); + } + + try { + deleteMyEphemeralNode(); + } catch (KeeperException.NoNodeException nn) { + // pass + } catch (KeeperException e) { + LOG.warn("Failed deleting my ephemeral node", e); + } + // We may have failed to delete the znode at the previous step, but + // we delete the file anyway: a second attempt to delete the znode is likely to fail again. + ZNodeClearer.deleteMyEphemeralNodeOnDisk(); + + if (this.zooKeeper != null) { + this.zooKeeper.close(); + } + this.shutDown = true; + LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); + span.setStatus(StatusCode.OK); + } finally { + span.end(); } - this.shutDown = true; - LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); } private boolean containsMetaTableRegions() { @@ -1293,23 +1317,29 @@ public class HRegionServer extends Thread return; } ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); - try { + final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); + try (Scope ignored = span.makeCurrent()) { RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); request.setServer(ProtobufUtil.toServerName(this.serverName)); request.setLoad(sl); rss.regionServerReport(null, request.build()); + span.setStatus(StatusCode.OK); } catch (ServiceException se) { IOException ioe = ProtobufUtil.getRemoteException(se); if (ioe instanceof YouAreDeadException) { // This will be caught and handled as a fatal error in run() + TraceUtil.setError(span, ioe); throw ioe; } if (rssStub == rss) { rssStub = null; } + TraceUtil.setError(span, se); // Couldn't connect to the master, get location from zk and reconnect // Method blocks until new master is found or we are stopped createRegionServerStatusStub(true); + } finally { + span.end(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java index 3b21171c28e..db74380d2ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java @@ -17,9 +17,13 @@ */ package org.apache.hadoop.hbase.regionserver; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.LocalHBaseCluster; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.ServerCommandLine; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -47,7 +51,8 @@ public class HRegionServerCommandLine extends ServerCommandLine { private int start() throws Exception { Configuration conf = getConf(); - try { + final Span span = TraceUtil.createSpan("HRegionServerCommandLine.start"); + try (Scope ignored = span.makeCurrent()) { // If 'local', don't start a region server here. Defer to // LocalHBaseCluster. It manages 'local' clusters. if (LocalHBaseCluster.isLocal(conf)) { @@ -62,9 +67,13 @@ public class HRegionServerCommandLine extends ServerCommandLine { throw new RuntimeException("HRegionServer Aborted"); } } + span.setStatus(StatusCode.OK); } catch (Throwable t) { + TraceUtil.setError(span, t); LOG.error("Region server exiting", t); return 1; + } finally { + span.end(); } return 0; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerInternalsTracing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerInternalsTracing.java new file mode 100644 index 00000000000..ff010141f81 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerInternalsTracing.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.startsWith; + +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.List; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; +import org.hamcrest.Matcher; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExternalResource; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; +import org.junit.runners.model.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that sundry operations internal to the region server are traced as expected. + */ +@Category({ MediumTests.class, RegionServerTests.class }) +public class TestServerInternalsTracing { + private static final Logger LOG = LoggerFactory.getLogger(TestServerInternalsTracing.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestServerInternalsTracing.class); + + private static final String NO_PARENT_ID = "0000000000000000"; + private static List spans; + + /** + * Wait for the underlying cluster to come up -- defined by meta being available. + */ + private static class Setup extends ExternalResource { + private final Supplier testingUtilSupplier; + + public Setup(final Supplier testingUtilSupplier) { + this.testingUtilSupplier = testingUtilSupplier; + } + + @Override + protected void before() throws Throwable { + final HBaseTestingUtility testingUtil = testingUtilSupplier.get(); + testingUtil.waitTableAvailable(TableName.META_TABLE_NAME); + } + } + + private static class Noop extends Statement { + @Override + public void evaluate() throws Throwable { + } + } + + @ClassRule + public static TestRule classRule = (base, description) -> new Statement() { + @Override + public void evaluate() throws Throwable { + // setup and tear down the cluster, collecting all the spans produced in the process. + final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create(); + final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build(); + final Setup setup = new Setup(miniClusterRule::getTestingUtility); + final TestRule clusterRule = + RuleChain.outerRule(otelClassRule).around(miniClusterRule).around(setup); + clusterRule.apply(new Noop(), description).evaluate(); + spans = otelClassRule.getSpans(); + if (LOG.isDebugEnabled()) { + StringTraceRenderer renderer = new StringTraceRenderer(spans); + renderer.render(LOG::debug); + } + base.evaluate(); + } + }; + + @Test + public void testHMasterConstructor() { + final Matcher masterConstructorMatcher = allOf(hasName("HMaster.cxtor"), + hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the HMaster constructor.", spans, + hasItem(masterConstructorMatcher)); + final SpanData masterConstructorSpan = spans.stream().filter(masterConstructorMatcher::matches) + .findAny().orElseThrow(AssertionError::new); + assertThat("the HMaster constructor span should show zookeeper interaction.", spans, hasItem( + allOf(hasName(startsWith("RecoverableZookeeper.")), hasParentSpanId(masterConstructorSpan)))); + } + + @Test + public void testHMasterBecomeActiveMaster() { + final Matcher masterBecomeActiveMasterMatcher = + allOf(hasName("HMaster.becomeActiveMaster"), hasParentSpanId(NO_PARENT_ID), + hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the HMaster.becomeActiveMaster.", spans, + hasItem(masterBecomeActiveMasterMatcher)); + final SpanData masterBecomeActiveMasterSpan = spans.stream() + .filter(masterBecomeActiveMasterMatcher::matches).findAny().orElseThrow(AssertionError::new); + assertThat("the HMaster.becomeActiveMaster span should show zookeeper interaction.", spans, + hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")), + hasParentSpanId(masterBecomeActiveMasterSpan)))); + assertThat("the HMaster.becomeActiveMaster span should show Region interaction.", spans, + hasItem( + allOf(hasName(startsWith("Region.")), hasParentSpanId(masterBecomeActiveMasterSpan)))); + assertThat("the HMaster.becomeActiveMaster span should show RegionScanner interaction.", spans, + hasItem(allOf(hasName(startsWith("RegionScanner.")), + hasParentSpanId(masterBecomeActiveMasterSpan)))); + assertThat("the HMaster.becomeActiveMaster span should show hbase:meta interaction.", spans, + hasItem(allOf(hasName(containsString("hbase:meta")), + hasParentSpanId(masterBecomeActiveMasterSpan)))); + assertThat("the HMaster.becomeActiveMaster span should show WAL interaction.", spans, + hasItem(allOf(hasName(startsWith("WAL.")), hasParentSpanId(masterBecomeActiveMasterSpan)))); + } + + @Test + public void testZKWatcherHMaster() { + final Matcher mZKWatcherMatcher = allOf(hasName(startsWith("ZKWatcher-master")), + hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the ZKWatcher running in the HMaster.", spans, + hasItem(mZKWatcherMatcher)); + final SpanData mZKWatcherSpan = + spans.stream().filter(mZKWatcherMatcher::matches).findAny().orElseThrow(AssertionError::new); + assertThat("the ZKWatcher running in the HMaster span should invoke processEvent.", spans, + hasItem(allOf(hasName(containsString("processEvent")), hasParentSpanId(mZKWatcherSpan)))); + } + + @Test + public void testHMasterShutdown() { + final Matcher masterShutdownMatcher = allOf(hasName("HMaster.shutdown"), + hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the HMaster.shutdown.", spans, + hasItem(masterShutdownMatcher)); + final SpanData masterShutdownSpan = spans.stream().filter(masterShutdownMatcher::matches) + .findAny().orElseThrow(AssertionError::new); + assertThat("the HMaster.shutdown span should show zookeeper interaction.", spans, hasItem( + allOf(hasName(startsWith("RecoverableZookeeper.")), hasParentSpanId(masterShutdownSpan)))); + assertThat( + "the HMaster.shutdown span should show ShortCircuitingClusterConnection interaction.", spans, + hasItem(allOf(hasName(startsWith("ShortCircuitingClusterConnection.")), + hasParentSpanId(masterShutdownSpan)))); + } + + @Test + public void testHMasterExitingMainLoop() { + final Matcher masterExitingMainLoopMatcher = + allOf(hasName("HMaster exiting main loop"), hasParentSpanId(NO_PARENT_ID), + hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the HMaster exiting main loop.", spans, + hasItem(masterExitingMainLoopMatcher)); + final SpanData masterExitingMainLoopSpan = spans.stream() + .filter(masterExitingMainLoopMatcher::matches).findAny().orElseThrow(AssertionError::new); + assertThat("the HMaster exiting main loop span should show HTable interaction.", spans, + hasItem(allOf(hasName(startsWith("HTable.")), hasParentSpanId(masterExitingMainLoopSpan)))); + } + + @Test + public void testTryRegionServerReport() { + final Matcher tryRegionServerReportMatcher = + allOf(hasName("HRegionServer.tryRegionServerReport"), hasParentSpanId(NO_PARENT_ID), + hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span for the region server sending a report.", spans, + hasItem(tryRegionServerReportMatcher)); + final SpanData tryRegionServerReportSpan = spans.stream() + .filter(tryRegionServerReportMatcher::matches).findAny().orElseThrow(AssertionError::new); + assertThat( + "the region server report span should have an invocation of the RegionServerReport RPC.", + spans, hasItem(allOf(hasName(endsWith("RegionServerStatusService/RegionServerReport")), + hasParentSpanId(tryRegionServerReportSpan)))); + } + + @Test + public void testHRegionServerStartup() { + final Matcher regionServerStartupMatcher = allOf(hasName("HRegionServer.startup"), + hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the HRegionServer startup procedure.", spans, + hasItem(regionServerStartupMatcher)); + final SpanData regionServerStartupSpan = spans.stream() + .filter(regionServerStartupMatcher::matches).findAny().orElseThrow(AssertionError::new); + assertThat("the HRegionServer startup procedure span should show zookeeper interaction.", spans, + hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")), + hasParentSpanId(regionServerStartupSpan)))); + } + + @Test + public void testHRegionServerConstructor() { + final Matcher rsConstructorMatcher = allOf(hasName("HRegionServer.cxtor"), + hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the HRegionServer constructor.", spans, + hasItem(rsConstructorMatcher)); + final SpanData rsConstructorSpan = spans.stream().filter(rsConstructorMatcher::matches) + .findAny().orElseThrow(AssertionError::new); + assertThat("the HRegionServer constructor span should show zookeeper interaction.", spans, + hasItem( + allOf(hasName(startsWith("RecoverableZookeeper.")), hasParentSpanId(rsConstructorSpan)))); + assertThat("the HRegionServer constructor span should invoke the MasterAddressTracker.", spans, + hasItem( + allOf(hasName(startsWith("MasterAddressTracker.")), hasParentSpanId(rsConstructorSpan)))); + } + + @Test + public void testHRegionServerPreRegistrationInitialization() { + final Matcher rsPreRegistrationInitializationMatcher = + allOf(hasName("HRegionServer.preRegistrationInitialization"), hasParentSpanId(NO_PARENT_ID), + hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the HRegionServer preRegistrationInitialization.", + spans, hasItem(rsPreRegistrationInitializationMatcher)); + final SpanData rsPreRegistrationInitializationSpan = + spans.stream().filter(rsPreRegistrationInitializationMatcher::matches).findAny() + .orElseThrow(AssertionError::new); + assertThat( + "the HRegionServer preRegistrationInitialization span should show zookeeper interaction.", + spans, hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")), + hasParentSpanId(rsPreRegistrationInitializationSpan)))); + } + + @Test + public void testHRegionServerRegisterWithMaster() { + final Matcher rsRegisterWithMasterMatcher = + allOf(hasName("HRegionServer.registerWithMaster"), hasParentSpanId(NO_PARENT_ID), + hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the HRegionServer registerWithMaster.", spans, + hasItem(rsRegisterWithMasterMatcher)); + final SpanData rsRegisterWithMasterSpan = spans.stream() + .filter(rsRegisterWithMasterMatcher::matches).findAny().orElseThrow(AssertionError::new); + assertThat("the HRegionServer registerWithMaster span should show zookeeper interaction.", + spans, hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")), + hasParentSpanId(rsRegisterWithMasterSpan)))); + assertThat( + "the HRegionServer registerWithMaster span should have an invocation of the" + + " RegionServerStartup RPC.", + spans, hasItem(allOf(hasName(endsWith("RegionServerStatusService/RegionServerStartup")), + hasParentSpanId(rsRegisterWithMasterSpan)))); + } + + @Test + public void testZKWatcherRegionServer() { + final Matcher rsZKWatcherMatcher = + allOf(hasName(startsWith("ZKWatcher-regionserver")), hasParentSpanId(NO_PARENT_ID), + hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the ZKWatcher running in the HRegionServer.", spans, + hasItem(rsZKWatcherMatcher)); + final SpanData rsZKWatcherSpan = + spans.stream().filter(rsZKWatcherMatcher::matches).findAny().orElseThrow(AssertionError::new); + assertThat("the ZKWatcher running in the HRegionServer span should invoke processEvent.", spans, + hasItem(allOf(hasName(containsString("processEvent")), hasParentSpanId(rsZKWatcherSpan)))); + } + + @Test + public void testHRegionServerExitingMainLoop() { + final Matcher rsExitingMainLoopMatcher = + allOf(hasName("HRegionServer exiting main loop"), hasParentSpanId(NO_PARENT_ID), + hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR))); + assertThat("there should be a span from the HRegionServer exiting main loop.", spans, + hasItem(rsExitingMainLoopMatcher)); + final SpanData rsExitingMainLoopSpan = spans.stream().filter(rsExitingMainLoopMatcher::matches) + .findAny().orElseThrow(AssertionError::new); + assertThat("the HRegionServer exiting main loop span should show zookeeper interaction.", spans, + hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")), + hasParentSpanId(rsExitingMainLoopSpan)))); + assertThat( + "the HRegionServer exiting main loop span should show " + + "ShortCircuitingClusterConnection interaction.", + spans, hasItem(allOf(hasName(startsWith("ShortCircuitingClusterConnection.")), + hasParentSpanId(rsExitingMainLoopSpan)))); + assertThat("the HRegionServer exiting main loop span should invoke CloseMetaHandler.", spans, + hasItem(allOf(hasName("CloseMetaHandler"), hasParentSpanId(rsExitingMainLoopSpan)))); + } +} diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java index 649178f7a01..840ee2d215f 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -71,11 +72,13 @@ public class MasterAddressTracker extends ZKNodeTracker { } private void loadBackupMasters() { - try { - backupMasters = Collections.unmodifiableList(getBackupMastersAndRenewWatch(watcher)); - } catch (InterruptedIOException e) { - abortable.abort("Unexpected exception handling nodeChildrenChanged event", e); - } + TraceUtil.trace(() -> { + try { + backupMasters = Collections.unmodifiableList(getBackupMastersAndRenewWatch(watcher)); + } catch (InterruptedIOException e) { + abortable.abort("Unexpected exception handling nodeChildrenChanged event", e); + } + }, "MasterAddressTracker.loadBackupMasters"); } @Override diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 017679e08bd..622a4f49939 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.zookeeper; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -201,14 +202,15 @@ public class RecoverableZooKeeper { * throw NoNodeException if the path does not exist. */ public void delete(String path, int version) throws InterruptedException, KeeperException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.delete").startSpan(); - try (Scope scope = span.makeCurrent()) { + final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); boolean isRetry = false; // False for first attempt, true for all retries. while (true) { try { long startTime = EnvironmentEdgeManager.currentTime(); checkZk().delete(path, version); + span.setStatus(StatusCode.OK); return; } catch (KeeperException e) { switch (e.code()) { @@ -216,18 +218,22 @@ public class RecoverableZooKeeper { if (isRetry) { LOG.debug( "Node " + path + " already deleted. Assuming a " + "previous attempt succeeded."); + span.setStatus(StatusCode.OK); return; } LOG.debug("Node {} already deleted, retry={}", path, isRetry); + TraceUtil.setError(span, e); throw e; case CONNECTIONLOSS: case OPERATIONTIMEOUT: case REQUESTTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "delete"); break; default: + TraceUtil.setError(span, e); throw e; } } @@ -244,23 +250,26 @@ public class RecoverableZooKeeper { * @return A Stat instance */ public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan(); - try (Scope scope = span.makeCurrent()) { + final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { long startTime = EnvironmentEdgeManager.currentTime(); Stat nodeStat = checkZk().exists(path, watcher); + span.setStatus(StatusCode.OK); return nodeStat; } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: case REQUESTTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "exists"); break; default: + TraceUtil.setError(span, e); throw e; } } @@ -276,24 +285,28 @@ public class RecoverableZooKeeper { * @return A Stat instance */ public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan(); - try (Scope scope = span.makeCurrent()) { + Span span = TraceUtil.createSpan("RecoverableZookeeper.exists"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { long startTime = EnvironmentEdgeManager.currentTime(); Stat nodeStat = checkZk().exists(path, watch); + span.setStatus(StatusCode.OK); return nodeStat; } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "exists"); break; case OPERATIONTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "exists"); break; default: + TraceUtil.setError(span, e); throw e; } } @@ -319,24 +332,26 @@ public class RecoverableZooKeeper { */ public List getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException { - Span span = - TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan(); - try (Scope scope = span.makeCurrent()) { + final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { long startTime = EnvironmentEdgeManager.currentTime(); List children = checkZk().getChildren(path, watcher); + span.setStatus(StatusCode.OK); return children; } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: case REQUESTTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "getChildren"); break; default: + TraceUtil.setError(span, e); throw e; } } @@ -353,25 +368,28 @@ public class RecoverableZooKeeper { */ public List getChildren(String path, boolean watch) throws KeeperException, InterruptedException { - Span span = - TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan(); - try (Scope scope = span.makeCurrent()) { + Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { long startTime = EnvironmentEdgeManager.currentTime(); List children = checkZk().getChildren(path, watch); + span.setStatus(StatusCode.OK); return children; } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "getChildren"); break; case OPERATIONTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "getChildren"); break; default: + TraceUtil.setError(span, e); throw e; } } @@ -387,23 +405,26 @@ public class RecoverableZooKeeper { */ public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan(); - try (Scope scope = span.makeCurrent()) { + final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { long startTime = EnvironmentEdgeManager.currentTime(); byte[] revData = checkZk().getData(path, watcher, stat); + span.setStatus(StatusCode.OK); return ZKMetadata.removeMetaData(revData); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: case REQUESTTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "getData"); break; default: + TraceUtil.setError(span, e); throw e; } } @@ -426,17 +447,21 @@ public class RecoverableZooKeeper { try { long startTime = EnvironmentEdgeManager.currentTime(); byte[] revData = checkZk().getData(path, watch, stat); + span.setStatus(StatusCode.OK); return ZKMetadata.removeMetaData(revData); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "getData"); break; case OPERATIONTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "getData"); break; default: + TraceUtil.setError(span, e); throw e; } } @@ -455,8 +480,8 @@ public class RecoverableZooKeeper { */ public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setData").startSpan(); - try (Scope scope = span.makeCurrent()) { + final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); byte[] newData = ZKMetadata.appendMetaData(id, data); boolean isRetry = false; @@ -465,12 +490,14 @@ public class RecoverableZooKeeper { try { startTime = EnvironmentEdgeManager.currentTime(); Stat nodeStat = checkZk().setData(path, newData, version); + span.setStatus(StatusCode.OK); return nodeStat; } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: case REQUESTTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "setData"); break; case BADVERSION: @@ -481,15 +508,18 @@ public class RecoverableZooKeeper { byte[] revData = checkZk().getData(path, false, stat); if (Bytes.compareTo(revData, newData) == 0) { // the bad version is caused by previous successful setData + span.setStatus(StatusCode.OK); return stat; } } catch (KeeperException keeperException) { // the ZK is not reliable at this moment. just throwing exception + TraceUtil.setError(span, keeperException); throw keeperException; } } // throw other exceptions and verified bad version exceptions default: + TraceUtil.setError(span, e); throw e; } } @@ -506,23 +536,26 @@ public class RecoverableZooKeeper { * @return list of ACLs */ public List getAcl(String path, Stat stat) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getAcl").startSpan(); - try (Scope scope = span.makeCurrent()) { + final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { long startTime = EnvironmentEdgeManager.currentTime(); List nodeACL = checkZk().getACL(path, stat); + span.setStatus(StatusCode.OK); return nodeACL; } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: case REQUESTTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "getAcl"); break; default: + TraceUtil.setError(span, e); throw e; } } @@ -539,22 +572,25 @@ public class RecoverableZooKeeper { */ public Stat setAcl(String path, List acls, int version) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setAcl").startSpan(); - try (Scope scope = span.makeCurrent()) { + final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { long startTime = EnvironmentEdgeManager.currentTime(); Stat nodeStat = checkZk().setACL(path, acls, version); + span.setStatus(StatusCode.OK); return nodeStat; } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "setAcl"); break; default: + TraceUtil.setError(span, e); throw e; } } @@ -578,20 +614,25 @@ public class RecoverableZooKeeper { */ public String create(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.create").startSpan(); - try (Scope scope = span.makeCurrent()) { + final Span span = TraceUtil.createSpan("RecoverableZookeeper.create"); + try (Scope ignored = span.makeCurrent()) { byte[] newData = ZKMetadata.appendMetaData(id, data); switch (createMode) { case EPHEMERAL: case PERSISTENT: + span.setStatus(StatusCode.OK); return createNonSequential(path, newData, acl, createMode); case EPHEMERAL_SEQUENTIAL: case PERSISTENT_SEQUENTIAL: + span.setStatus(StatusCode.OK); return createSequential(path, newData, acl, createMode); default: - throw new IllegalArgumentException("Unrecognized CreateMode: " + createMode); + final IllegalArgumentException e = + new IllegalArgumentException("Unrecognized CreateMode: " + createMode); + TraceUtil.setError(span, e); + throw e; } } finally { span.end(); @@ -709,24 +750,27 @@ public class RecoverableZooKeeper { * Run multiple operations in a transactional manner. Retry before throwing exception */ public List multi(Iterable ops) throws KeeperException, InterruptedException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.multi").startSpan(); - try (Scope scope = span.makeCurrent()) { + final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi"); + try (Scope ignored = span.makeCurrent()) { RetryCounter retryCounter = retryCounterFactory.create(); Iterable multiOps = prepareZKMulti(ops); while (true) { try { long startTime = EnvironmentEdgeManager.currentTime(); List opResults = checkZk().multi(multiOps); + span.setStatus(StatusCode.OK); return opResults; } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: case REQUESTTIMEOUT: + TraceUtil.setError(span, e); retryOrThrow(retryCounter, e, "multi"); break; default: + TraceUtil.setError(span, e); throw e; } } diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index b80dd699cca..feaa62fd77b 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; @@ -552,45 +553,47 @@ public class ZKWatcher implements Watcher, Abortable, Closeable { } private void processEvent(WatchedEvent event) { - switch (event.getType()) { - // If event type is NONE, this is a connection status change - case None: { - connectionEvent(event); - break; - } - - // Otherwise pass along to the listeners - case NodeCreated: { - for (ZKListener listener : listeners) { - listener.nodeCreated(event.getPath()); + TraceUtil.trace(() -> { + switch (event.getType()) { + // If event type is NONE, this is a connection status change + case None: { + connectionEvent(event); + break; } - break; - } - case NodeDeleted: { - for (ZKListener listener : listeners) { - listener.nodeDeleted(event.getPath()); + // Otherwise pass along to the listeners + case NodeCreated: { + for (ZKListener listener : listeners) { + listener.nodeCreated(event.getPath()); + } + break; } - break; - } - case NodeDataChanged: { - for (ZKListener listener : listeners) { - listener.nodeDataChanged(event.getPath()); + case NodeDeleted: { + for (ZKListener listener : listeners) { + listener.nodeDeleted(event.getPath()); + } + break; } - break; - } - case NodeChildrenChanged: { - for (ZKListener listener : listeners) { - listener.nodeChildrenChanged(event.getPath()); + case NodeDataChanged: { + for (ZKListener listener : listeners) { + listener.nodeDataChanged(event.getPath()); + } + break; } - break; + + case NodeChildrenChanged: { + for (ZKListener listener : listeners) { + listener.nodeChildrenChanged(event.getPath()); + } + break; + } + default: + LOG.error("Invalid event of type {} received for path {}. Ignoring.", event.getState(), + event.getPath()); } - default: - LOG.error("Invalid event of type {} received for path {}. Ignoring.", event.getState(), - event.getPath()); - } + }, "ZKWatcher.processEvent: " + event.getType() + " " + event.getPath()); } /** @@ -602,7 +605,8 @@ public class ZKWatcher implements Watcher, Abortable, Closeable { public void process(WatchedEvent event) { LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state=" + event.getState() + ", " + "path=" + event.getPath())); - zkEventProcessor.submit(() -> processEvent(event)); + final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier; + zkEventProcessor.submit(TraceUtil.tracedRunnable(() -> processEvent(event), spanName)); } // Connection management