HBASE-26366 Provide meaningful parent spans to ZK interactions
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
9741f1a5c1
commit
ac7622c398
|
@ -26,7 +26,6 @@ import io.opentelemetry.api.trace.SpanKind;
|
|||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.EventData;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
@ -144,23 +143,20 @@ public final class SpanDataMatchers {
|
|||
};
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
|
||||
final Matcher<StatusCode> matcher = is(equalTo(statusCode));
|
||||
return new TypeSafeMatcher<SpanData>() {
|
||||
public static Matcher<SpanData> hasStatusWithCode(Matcher<StatusCode> matcher) {
|
||||
return new FeatureMatcher<SpanData, StatusCode>(matcher, "SpanData with StatusCode that",
|
||||
"statusWithCode") {
|
||||
@Override
|
||||
protected boolean matchesSafely(SpanData item) {
|
||||
final StatusData statusData = item.getStatus();
|
||||
return statusData != null && statusData.getStatusCode() != null
|
||||
&& matcher.matches(statusData.getStatusCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher);
|
||||
protected StatusCode featureValueOf(SpanData actual) {
|
||||
return actual.getStatus().getStatusCode();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
|
||||
return hasStatusWithCode(is(equalTo(statusCode)));
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasTraceId(String traceId) {
|
||||
return hasTraceId(is(equalTo(traceId)));
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledFuture;
|
|||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -163,8 +164,9 @@ public class ChoreService {
|
|||
chore.getChoreService().cancelChore(chore);
|
||||
}
|
||||
chore.setChoreService(this);
|
||||
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
|
||||
chore.getPeriod(), chore.getTimeUnit());
|
||||
ScheduledFuture<?> future =
|
||||
scheduler.scheduleAtFixedRate(TraceUtil.tracedRunnable(chore, chore.getName()),
|
||||
chore.getInitialDelay(), chore.getPeriod(), chore.getTimeUnit());
|
||||
scheduledChores.put(chore, future);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -135,6 +135,31 @@ public final class TraceUtil {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
|
||||
*/
|
||||
public static Runnable tracedRunnable(final Runnable runnable, final String spanName) {
|
||||
return tracedRunnable(runnable, () -> createSpan(spanName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
|
||||
*/
|
||||
public static Runnable tracedRunnable(final Runnable runnable,
|
||||
final Supplier<Span> spanSupplier) {
|
||||
// N.B. This method name follows the convention of this class, i.e., tracedFuture, rather than
|
||||
// the convention of the OpenTelemetry classes, i.e., Context#wrap.
|
||||
return () -> {
|
||||
final Span span = spanSupplier.get();
|
||||
try (final Scope ignored = span.makeCurrent()) {
|
||||
runnable.run();
|
||||
span.setStatus(StatusCode.OK);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link Runnable} that may also throw.
|
||||
* @param <T> the type of {@link Throwable} that can be produced.
|
||||
|
@ -144,11 +169,17 @@ public final class TraceUtil {
|
|||
void run() throws T;
|
||||
}
|
||||
|
||||
/**
|
||||
* Trace the execution of {@code runnable}.
|
||||
*/
|
||||
public static <T extends Throwable> void trace(final ThrowingRunnable<T> runnable,
|
||||
final String spanName) throws T {
|
||||
trace(runnable, () -> createSpan(spanName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Trace the execution of {@code runnable}.
|
||||
*/
|
||||
public static <T extends Throwable> void trace(final ThrowingRunnable<T> runnable,
|
||||
final Supplier<Span> spanSupplier) throws T {
|
||||
Span span = spanSupplier.get();
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
|
|||
import java.util.concurrent.ThreadFactory;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||
|
@ -101,41 +102,43 @@ public class MetaRegionLocationCache extends ZKListener {
|
|||
* @param retryCounter controls the number of retries and sleep between retries.
|
||||
*/
|
||||
private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) {
|
||||
List<String> znodes = null;
|
||||
while (retryCounter.shouldRetry()) {
|
||||
try {
|
||||
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
|
||||
break;
|
||||
} catch (KeeperException ke) {
|
||||
LOG.debug("Error populating initial meta locations", ke);
|
||||
if (!retryCounter.shouldRetry()) {
|
||||
// Retries exhausted and watchers not set. This is not a desirable state since the cache
|
||||
// could remain stale forever. Propagate the exception.
|
||||
watcher.abort("Error populating meta locations", ke);
|
||||
return;
|
||||
}
|
||||
TraceUtil.trace(() -> {
|
||||
List<String> znodes = null;
|
||||
while (retryCounter.shouldRetry()) {
|
||||
try {
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.error("Interrupted while loading meta locations from ZK", ie);
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
|
||||
break;
|
||||
} catch (KeeperException ke) {
|
||||
LOG.debug("Error populating initial meta locations", ke);
|
||||
if (!retryCounter.shouldRetry()) {
|
||||
// Retries exhausted and watchers not set. This is not a desirable state since the cache
|
||||
// could remain stale forever. Propagate the exception.
|
||||
watcher.abort("Error populating meta locations", ke);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.error("Interrupted while loading meta locations from ZK", ie);
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (znodes == null || znodes.isEmpty()) {
|
||||
// No meta znodes exist at this point but we registered a watcher on the base znode to listen
|
||||
// for updates. They will be handled via nodeChildrenChanged().
|
||||
return;
|
||||
}
|
||||
if (znodes.size() == cachedMetaLocations.size()) {
|
||||
// No new meta znodes got added.
|
||||
return;
|
||||
}
|
||||
for (String znode : znodes) {
|
||||
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
|
||||
updateMetaLocation(path, opType);
|
||||
}
|
||||
if (znodes == null || znodes.isEmpty()) {
|
||||
// No meta znodes exist at this point but we registered a watcher on the base znode to
|
||||
// listen for updates. They will be handled via nodeChildrenChanged().
|
||||
return;
|
||||
}
|
||||
if (znodes.size() == cachedMetaLocations.size()) {
|
||||
// No new meta znodes got added.
|
||||
return;
|
||||
}
|
||||
for (String znode : znodes) {
|
||||
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
|
||||
updateMetaLocation(path, opType);
|
||||
}
|
||||
}, "MetaRegionLocationCache.loadMetaLocationsFromZk");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -26,6 +26,9 @@ import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
|
|||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Service;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
|
@ -213,6 +216,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
|
|||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.security.SecurityConstants;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -452,7 +456,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
*/
|
||||
public HMaster(final Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
try {
|
||||
final Span span = TraceUtil.createSpan("HMaster.cxtor");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
if (conf.getBoolean(MAINTENANCE_MODE, false)) {
|
||||
LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
|
||||
maintenanceMode = true;
|
||||
|
@ -513,11 +518,15 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
cachedClusterId = new CachedClusterId(this, conf);
|
||||
|
||||
this.regionServerTracker = new RegionServerTracker(zooKeeper, this);
|
||||
span.setStatus(StatusCode.OK);
|
||||
} catch (Throwable t) {
|
||||
// Make sure we log the exception. HMaster is often started via reflection and the
|
||||
// cause of failed startup is lost.
|
||||
TraceUtil.setError(span, t);
|
||||
LOG.error("Failed construction of Master", t);
|
||||
throw t;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -540,7 +549,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Threads.setDaemonThreadRunning(new Thread(() -> {
|
||||
Threads.setDaemonThreadRunning(new Thread(() -> TraceUtil.trace(() -> {
|
||||
try {
|
||||
int infoPort = putUpJettyServer();
|
||||
startActiveMasterManager(infoPort);
|
||||
|
@ -553,23 +562,29 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
abort(error, t);
|
||||
}
|
||||
}
|
||||
}), getName() + ":becomeActiveMaster");
|
||||
}, "HMaster.becomeActiveMaster")), getName() + ":becomeActiveMaster");
|
||||
// Fall in here even if we have been aborted. Need to run the shutdown services and
|
||||
// the super run call will do this for us.
|
||||
super.run();
|
||||
} finally {
|
||||
if (this.clusterSchemaService != null) {
|
||||
// If on way out, then we are no longer active master.
|
||||
this.clusterSchemaService.stopAsync();
|
||||
try {
|
||||
this.clusterSchemaService
|
||||
.awaitTerminated(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
|
||||
DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
|
||||
} catch (TimeoutException te) {
|
||||
LOG.warn("Failed shutdown of clusterSchemaService", te);
|
||||
final Span span = TraceUtil.createSpan("HMaster exiting main loop");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
if (this.clusterSchemaService != null) {
|
||||
// If on way out, then we are no longer active master.
|
||||
this.clusterSchemaService.stopAsync();
|
||||
try {
|
||||
this.clusterSchemaService
|
||||
.awaitTerminated(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
|
||||
DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
|
||||
} catch (TimeoutException te) {
|
||||
LOG.warn("Failed shutdown of clusterSchemaService", te);
|
||||
}
|
||||
}
|
||||
this.activeMaster = false;
|
||||
span.setStatus(StatusCode.OK);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
this.activeMaster = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3094,36 +3109,38 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
* Shutdown the cluster. Master runs a coordinated stop of all RegionServers and then itself.
|
||||
*/
|
||||
public void shutdown() throws IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preShutdown();
|
||||
}
|
||||
|
||||
// Tell the servermanager cluster shutdown has been called. This makes it so when Master is
|
||||
// last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
|
||||
// the cluster status as down. RegionServers will notice this change in state and will start
|
||||
// shutting themselves down. When last has exited, Master can go down.
|
||||
if (this.serverManager != null) {
|
||||
this.serverManager.shutdownCluster();
|
||||
}
|
||||
if (this.clusterStatusTracker != null) {
|
||||
try {
|
||||
this.clusterStatusTracker.setClusterDown();
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
|
||||
TraceUtil.trace(() -> {
|
||||
if (cpHost != null) {
|
||||
cpHost.preShutdown();
|
||||
}
|
||||
}
|
||||
// Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
|
||||
// processing so we can go down.
|
||||
if (this.procedureExecutor != null) {
|
||||
this.procedureExecutor.stop();
|
||||
}
|
||||
// Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
|
||||
// this is what we want especially if the Master is in startup phase doing call outs to
|
||||
// hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
|
||||
// the rpc to timeout.
|
||||
if (this.clusterConnection != null) {
|
||||
this.clusterConnection.close();
|
||||
}
|
||||
|
||||
// Tell the servermanager cluster shutdown has been called. This makes it so when Master is
|
||||
// last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
|
||||
// the cluster status as down. RegionServers will notice this change in state and will start
|
||||
// shutting themselves down. When last has exited, Master can go down.
|
||||
if (this.serverManager != null) {
|
||||
this.serverManager.shutdownCluster();
|
||||
}
|
||||
if (this.clusterStatusTracker != null) {
|
||||
try {
|
||||
this.clusterStatusTracker.setClusterDown();
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
|
||||
}
|
||||
}
|
||||
// Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
|
||||
// processing so we can go down.
|
||||
if (this.procedureExecutor != null) {
|
||||
this.procedureExecutor.stop();
|
||||
}
|
||||
// Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
|
||||
// this is what we want especially if the Master is in startup phase doing call outs to
|
||||
// hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
|
||||
// the rpc to timeout.
|
||||
if (this.clusterConnection != null) {
|
||||
this.clusterConnection.close();
|
||||
}
|
||||
}, "HMaster.shutdown");
|
||||
}
|
||||
|
||||
public void stopMaster() throws IOException {
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -30,13 +33,13 @@ import org.apache.hadoop.hbase.client.Admin;
|
|||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.ServerCommandLine;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAuthentication;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -161,7 +164,8 @@ public class HMasterCommandLine extends ServerCommandLine {
|
|||
|
||||
private int startMaster() {
|
||||
Configuration conf = getConf();
|
||||
try {
|
||||
final Span span = TraceUtil.createSpan("HMasterCommandLine.startMaster");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
// If 'local', defer to LocalHBaseCluster instance. Starts master
|
||||
// and regionserver both in the one JVM.
|
||||
if (LocalHBaseCluster.isLocal(conf)) {
|
||||
|
@ -250,9 +254,13 @@ public class HMasterCommandLine extends ServerCommandLine {
|
|||
master.join();
|
||||
if (master.isAborted()) throw new RuntimeException("HMaster Aborted");
|
||||
}
|
||||
span.setStatus(StatusCode.OK);
|
||||
} catch (Throwable t) {
|
||||
TraceUtil.setError(span, t);
|
||||
LOG.error("Master exiting", t);
|
||||
return 1;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -310,8 +318,7 @@ public class HMasterCommandLine extends ServerCommandLine {
|
|||
public static class LocalHMaster extends HMaster {
|
||||
private MiniZooKeeperCluster zkcluster = null;
|
||||
|
||||
public LocalHMaster(Configuration conf)
|
||||
throws IOException, KeeperException, InterruptedException {
|
||||
public LocalHMaster(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Collections;
|
||||
|
@ -29,6 +32,7 @@ import org.apache.hadoop.hbase.ServerMetrics;
|
|||
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
@ -184,22 +188,28 @@ public class RegionServerTracker extends ZKListener {
|
|||
|
||||
private synchronized void refresh() {
|
||||
List<String> names;
|
||||
try {
|
||||
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
|
||||
} catch (KeeperException e) {
|
||||
// here we need to abort as we failed to set watcher on the rs node which means that we can
|
||||
// not track the node deleted event any more.
|
||||
server.abort("Unexpected zk exception getting RS nodes", e);
|
||||
return;
|
||||
final Span span = TraceUtil.createSpan("RegionServerTracker.refresh");
|
||||
try (final Scope ignored = span.makeCurrent()) {
|
||||
try {
|
||||
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
|
||||
} catch (KeeperException e) {
|
||||
// here we need to abort as we failed to set watcher on the rs node which means that we can
|
||||
// not track the node deleted event any more.
|
||||
server.abort("Unexpected zk exception getting RS nodes", e);
|
||||
return;
|
||||
}
|
||||
Set<ServerName> newServers = CollectionUtils.isEmpty(names)
|
||||
? Collections.emptySet()
|
||||
: names.stream().map(ServerName::parseServerName)
|
||||
.collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet));
|
||||
if (active) {
|
||||
processAsActiveMaster(newServers);
|
||||
}
|
||||
this.regionServers = newServers;
|
||||
span.setStatus(StatusCode.OK);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
Set<ServerName> newServers = CollectionUtils.isEmpty(names)
|
||||
? Collections.emptySet()
|
||||
: names.stream().map(ServerName::parseServerName)
|
||||
.collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet));
|
||||
if (active) {
|
||||
processAsActiveMaster(newServers);
|
||||
}
|
||||
this.regionServers = newServers;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,9 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_
|
|||
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.lang.management.MemoryType;
|
||||
|
@ -161,6 +164,7 @@ import org.apache.hadoop.hbase.security.User;
|
|||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
||||
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -589,7 +593,8 @@ public class HRegionServer extends Thread
|
|||
*/
|
||||
public HRegionServer(final Configuration conf) throws IOException {
|
||||
super("RegionServer"); // thread name
|
||||
try {
|
||||
final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
this.startcode = EnvironmentEdgeManager.currentTime();
|
||||
this.conf = conf;
|
||||
this.dataFsOk = true;
|
||||
|
@ -701,11 +706,15 @@ public class HRegionServer extends Thread
|
|||
this.choreService = new ChoreService(getName(), true);
|
||||
this.executorService = new ExecutorService(getName());
|
||||
putUpWebUI();
|
||||
span.setStatus(StatusCode.OK);
|
||||
} catch (Throwable t) {
|
||||
// Make sure we log the exception. HRegionServer is often started via reflection and the
|
||||
// cause of failed startup is lost.
|
||||
TraceUtil.setError(span, t);
|
||||
LOG.error("Failed construction RegionServer", t);
|
||||
throw t;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -920,18 +929,23 @@ public class HRegionServer extends Thread
|
|||
* In here we just put up the RpcServer, setup Connection, and ZooKeeper.
|
||||
*/
|
||||
private void preRegistrationInitialization() {
|
||||
try {
|
||||
final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
initializeZooKeeper();
|
||||
setupClusterConnection();
|
||||
// Setup RPC client for master communication
|
||||
this.rpcClient = RpcClientFactory.createClient(conf, clusterId,
|
||||
new InetSocketAddress(this.rpcServices.isa.getAddress(), 0),
|
||||
clusterConnection.getConnectionMetrics());
|
||||
span.setStatus(StatusCode.OK);
|
||||
} catch (Throwable t) {
|
||||
// Call stop if error or process will stick around for ever since server
|
||||
// puts up non-daemon threads.
|
||||
TraceUtil.setError(span, t);
|
||||
this.rpcServices.stop();
|
||||
abort("Initialization of RS failed. Hence aborting RS.", t);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1042,35 +1056,39 @@ public class HRegionServer extends Thread
|
|||
// start up all Services. Use RetryCounter to get backoff in case Master is struggling to
|
||||
// come up.
|
||||
LOG.debug("About to register with Master.");
|
||||
RetryCounterFactory rcf =
|
||||
new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
|
||||
RetryCounter rc = rcf.create();
|
||||
while (keepLooping()) {
|
||||
RegionServerStartupResponse w = reportForDuty();
|
||||
if (w == null) {
|
||||
long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
|
||||
LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
|
||||
this.sleeper.sleep(sleepTime);
|
||||
} else {
|
||||
handleReportForDutyResponse(w);
|
||||
break;
|
||||
TraceUtil.trace(() -> {
|
||||
RetryCounterFactory rcf =
|
||||
new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
|
||||
RetryCounter rc = rcf.create();
|
||||
while (keepLooping()) {
|
||||
RegionServerStartupResponse w = reportForDuty();
|
||||
if (w == null) {
|
||||
long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
|
||||
LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
|
||||
this.sleeper.sleep(sleepTime);
|
||||
} else {
|
||||
handleReportForDutyResponse(w);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}, "HRegionServer.registerWithMaster");
|
||||
}
|
||||
|
||||
if (!isStopped() && isHealthy()) {
|
||||
// start the snapshot handler and other procedure handlers,
|
||||
// since the server is ready to run
|
||||
if (this.rspmHost != null) {
|
||||
this.rspmHost.start();
|
||||
}
|
||||
// Start the Quota Manager
|
||||
if (this.rsQuotaManager != null) {
|
||||
rsQuotaManager.start(getRpcServer().getScheduler());
|
||||
}
|
||||
if (this.rsSpaceQuotaManager != null) {
|
||||
this.rsSpaceQuotaManager.start();
|
||||
}
|
||||
TraceUtil.trace(() -> {
|
||||
// start the snapshot handler and other procedure handlers,
|
||||
// since the server is ready to run
|
||||
if (this.rspmHost != null) {
|
||||
this.rspmHost.start();
|
||||
}
|
||||
// Start the Quota Manager
|
||||
if (this.rsQuotaManager != null) {
|
||||
rsQuotaManager.start(getRpcServer().getScheduler());
|
||||
}
|
||||
if (this.rsSpaceQuotaManager != null) {
|
||||
this.rsSpaceQuotaManager.start();
|
||||
}
|
||||
}, "HRegionServer.startup");
|
||||
}
|
||||
|
||||
// We registered with the Master. Go into run mode.
|
||||
|
@ -1121,138 +1139,144 @@ public class HRegionServer extends Thread
|
|||
}
|
||||
}
|
||||
|
||||
if (this.leaseManager != null) {
|
||||
this.leaseManager.closeAfterLeasesExpire();
|
||||
}
|
||||
if (this.splitLogWorker != null) {
|
||||
splitLogWorker.stop();
|
||||
}
|
||||
if (this.infoServer != null) {
|
||||
LOG.info("Stopping infoServer");
|
||||
try {
|
||||
this.infoServer.stop();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to stop infoServer", e);
|
||||
final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
if (this.leaseManager != null) {
|
||||
this.leaseManager.closeAfterLeasesExpire();
|
||||
}
|
||||
}
|
||||
// Send cache a shutdown.
|
||||
if (blockCache != null) {
|
||||
blockCache.shutdown();
|
||||
}
|
||||
if (mobFileCache != null) {
|
||||
mobFileCache.shutdown();
|
||||
}
|
||||
|
||||
// Send interrupts to wake up threads if sleeping so they notice shutdown.
|
||||
// TODO: Should we check they are alive? If OOME could have exited already
|
||||
if (this.hMemManager != null) {
|
||||
this.hMemManager.stop();
|
||||
}
|
||||
if (this.cacheFlusher != null) {
|
||||
this.cacheFlusher.interruptIfNecessary();
|
||||
}
|
||||
if (this.compactSplitThread != null) {
|
||||
this.compactSplitThread.interruptIfNecessary();
|
||||
}
|
||||
|
||||
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
|
||||
if (rspmHost != null) {
|
||||
rspmHost.stop(this.abortRequested.get() || this.killed);
|
||||
}
|
||||
|
||||
if (this.killed) {
|
||||
// Just skip out w/o closing regions. Used when testing.
|
||||
} else if (abortRequested.get()) {
|
||||
if (this.dataFsOk) {
|
||||
closeUserRegions(abortRequested.get()); // Don't leave any open file handles
|
||||
if (this.splitLogWorker != null) {
|
||||
splitLogWorker.stop();
|
||||
}
|
||||
LOG.info("aborting server " + this.serverName);
|
||||
} else {
|
||||
closeUserRegions(abortRequested.get());
|
||||
LOG.info("stopping server " + this.serverName);
|
||||
}
|
||||
|
||||
if (this.clusterConnection != null && !clusterConnection.isClosed()) {
|
||||
try {
|
||||
this.clusterConnection.close();
|
||||
} catch (IOException e) {
|
||||
// Although the {@link Closeable} interface throws an {@link
|
||||
// IOException}, in reality, the implementation would never do that.
|
||||
LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Closing the compactSplit thread before closing meta regions
|
||||
if (!this.killed && containsMetaTableRegions()) {
|
||||
if (!abortRequested.get() || this.dataFsOk) {
|
||||
if (this.compactSplitThread != null) {
|
||||
this.compactSplitThread.join();
|
||||
this.compactSplitThread = null;
|
||||
if (this.infoServer != null) {
|
||||
LOG.info("Stopping infoServer");
|
||||
try {
|
||||
this.infoServer.stop();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to stop infoServer", e);
|
||||
}
|
||||
closeMetaTableRegions(abortRequested.get());
|
||||
}
|
||||
}
|
||||
// Send cache a shutdown.
|
||||
if (blockCache != null) {
|
||||
blockCache.shutdown();
|
||||
}
|
||||
if (mobFileCache != null) {
|
||||
mobFileCache.shutdown();
|
||||
}
|
||||
|
||||
if (!this.killed && this.dataFsOk) {
|
||||
waitOnAllRegionsToClose(abortRequested.get());
|
||||
LOG.info("stopping server " + this.serverName + "; all regions closed.");
|
||||
}
|
||||
// Send interrupts to wake up threads if sleeping so they notice shutdown.
|
||||
// TODO: Should we check they are alive? If OOME could have exited already
|
||||
if (this.hMemManager != null) {
|
||||
this.hMemManager.stop();
|
||||
}
|
||||
if (this.cacheFlusher != null) {
|
||||
this.cacheFlusher.interruptIfNecessary();
|
||||
}
|
||||
if (this.compactSplitThread != null) {
|
||||
this.compactSplitThread.interruptIfNecessary();
|
||||
}
|
||||
|
||||
// Stop the quota manager
|
||||
if (rsQuotaManager != null) {
|
||||
rsQuotaManager.stop();
|
||||
}
|
||||
if (rsSpaceQuotaManager != null) {
|
||||
rsSpaceQuotaManager.stop();
|
||||
rsSpaceQuotaManager = null;
|
||||
}
|
||||
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
|
||||
if (rspmHost != null) {
|
||||
rspmHost.stop(this.abortRequested.get() || this.killed);
|
||||
}
|
||||
|
||||
// flag may be changed when closing regions throws exception.
|
||||
if (this.dataFsOk) {
|
||||
shutdownWAL(!abortRequested.get());
|
||||
}
|
||||
if (this.killed) {
|
||||
// Just skip out w/o closing regions. Used when testing.
|
||||
} else if (abortRequested.get()) {
|
||||
if (this.dataFsOk) {
|
||||
closeUserRegions(abortRequested.get()); // Don't leave any open file handles
|
||||
}
|
||||
LOG.info("aborting server " + this.serverName);
|
||||
} else {
|
||||
closeUserRegions(abortRequested.get());
|
||||
LOG.info("stopping server " + this.serverName);
|
||||
}
|
||||
|
||||
// Make sure the proxy is down.
|
||||
if (this.rssStub != null) {
|
||||
this.rssStub = null;
|
||||
}
|
||||
if (this.lockStub != null) {
|
||||
this.lockStub = null;
|
||||
}
|
||||
if (this.rpcClient != null) {
|
||||
this.rpcClient.close();
|
||||
}
|
||||
if (this.leaseManager != null) {
|
||||
this.leaseManager.close();
|
||||
}
|
||||
if (this.pauseMonitor != null) {
|
||||
this.pauseMonitor.stop();
|
||||
}
|
||||
if (this.clusterConnection != null && !clusterConnection.isClosed()) {
|
||||
try {
|
||||
this.clusterConnection.close();
|
||||
} catch (IOException e) {
|
||||
// Although the {@link Closeable} interface throws an {@link
|
||||
// IOException}, in reality, the implementation would never do that.
|
||||
LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (!killed) {
|
||||
stopServiceThreads();
|
||||
}
|
||||
// Closing the compactSplit thread before closing meta regions
|
||||
if (!this.killed && containsMetaTableRegions()) {
|
||||
if (!abortRequested.get() || this.dataFsOk) {
|
||||
if (this.compactSplitThread != null) {
|
||||
this.compactSplitThread.join();
|
||||
this.compactSplitThread = null;
|
||||
}
|
||||
closeMetaTableRegions(abortRequested.get());
|
||||
}
|
||||
}
|
||||
|
||||
if (this.rpcServices != null) {
|
||||
this.rpcServices.stop();
|
||||
}
|
||||
if (!this.killed && this.dataFsOk) {
|
||||
waitOnAllRegionsToClose(abortRequested.get());
|
||||
LOG.info("stopping server " + this.serverName + "; all regions closed.");
|
||||
}
|
||||
|
||||
try {
|
||||
deleteMyEphemeralNode();
|
||||
} catch (KeeperException.NoNodeException nn) {
|
||||
// pass
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed deleting my ephemeral node", e);
|
||||
}
|
||||
// We may have failed to delete the znode at the previous step, but
|
||||
// we delete the file anyway: a second attempt to delete the znode is likely to fail again.
|
||||
ZNodeClearer.deleteMyEphemeralNodeOnDisk();
|
||||
// Stop the quota manager
|
||||
if (rsQuotaManager != null) {
|
||||
rsQuotaManager.stop();
|
||||
}
|
||||
if (rsSpaceQuotaManager != null) {
|
||||
rsSpaceQuotaManager.stop();
|
||||
rsSpaceQuotaManager = null;
|
||||
}
|
||||
|
||||
if (this.zooKeeper != null) {
|
||||
this.zooKeeper.close();
|
||||
// flag may be changed when closing regions throws exception.
|
||||
if (this.dataFsOk) {
|
||||
shutdownWAL(!abortRequested.get());
|
||||
}
|
||||
|
||||
// Make sure the proxy is down.
|
||||
if (this.rssStub != null) {
|
||||
this.rssStub = null;
|
||||
}
|
||||
if (this.lockStub != null) {
|
||||
this.lockStub = null;
|
||||
}
|
||||
if (this.rpcClient != null) {
|
||||
this.rpcClient.close();
|
||||
}
|
||||
if (this.leaseManager != null) {
|
||||
this.leaseManager.close();
|
||||
}
|
||||
if (this.pauseMonitor != null) {
|
||||
this.pauseMonitor.stop();
|
||||
}
|
||||
|
||||
if (!killed) {
|
||||
stopServiceThreads();
|
||||
}
|
||||
|
||||
if (this.rpcServices != null) {
|
||||
this.rpcServices.stop();
|
||||
}
|
||||
|
||||
try {
|
||||
deleteMyEphemeralNode();
|
||||
} catch (KeeperException.NoNodeException nn) {
|
||||
// pass
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed deleting my ephemeral node", e);
|
||||
}
|
||||
// We may have failed to delete the znode at the previous step, but
|
||||
// we delete the file anyway: a second attempt to delete the znode is likely to fail again.
|
||||
ZNodeClearer.deleteMyEphemeralNodeOnDisk();
|
||||
|
||||
if (this.zooKeeper != null) {
|
||||
this.zooKeeper.close();
|
||||
}
|
||||
this.shutDown = true;
|
||||
LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
|
||||
span.setStatus(StatusCode.OK);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
this.shutDown = true;
|
||||
LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
|
||||
}
|
||||
|
||||
private boolean containsMetaTableRegions() {
|
||||
|
@ -1293,23 +1317,29 @@ public class HRegionServer extends Thread
|
|||
return;
|
||||
}
|
||||
ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
|
||||
try {
|
||||
final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
|
||||
request.setServer(ProtobufUtil.toServerName(this.serverName));
|
||||
request.setLoad(sl);
|
||||
rss.regionServerReport(null, request.build());
|
||||
span.setStatus(StatusCode.OK);
|
||||
} catch (ServiceException se) {
|
||||
IOException ioe = ProtobufUtil.getRemoteException(se);
|
||||
if (ioe instanceof YouAreDeadException) {
|
||||
// This will be caught and handled as a fatal error in run()
|
||||
TraceUtil.setError(span, ioe);
|
||||
throw ioe;
|
||||
}
|
||||
if (rssStub == rss) {
|
||||
rssStub = null;
|
||||
}
|
||||
TraceUtil.setError(span, se);
|
||||
// Couldn't connect to the master, get location from zk and reconnect
|
||||
// Method blocks until new master is found or we are stopped
|
||||
createRegionServerStatusStub(true);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,9 +17,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.LocalHBaseCluster;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.ServerCommandLine;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -47,7 +51,8 @@ public class HRegionServerCommandLine extends ServerCommandLine {
|
|||
|
||||
private int start() throws Exception {
|
||||
Configuration conf = getConf();
|
||||
try {
|
||||
final Span span = TraceUtil.createSpan("HRegionServerCommandLine.start");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
// If 'local', don't start a region server here. Defer to
|
||||
// LocalHBaseCluster. It manages 'local' clusters.
|
||||
if (LocalHBaseCluster.isLocal(conf)) {
|
||||
|
@ -62,9 +67,13 @@ public class HRegionServerCommandLine extends ServerCommandLine {
|
|||
throw new RuntimeException("HRegionServer Aborted");
|
||||
}
|
||||
}
|
||||
span.setStatus(StatusCode.OK);
|
||||
} catch (Throwable t) {
|
||||
TraceUtil.setError(span, t);
|
||||
LOG.error("Region server exiting", t);
|
||||
return 1;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,297 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.isOneOf;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.ExternalResource;
|
||||
import org.junit.rules.RuleChain;
|
||||
import org.junit.rules.TestRule;
|
||||
import org.junit.runners.model.Statement;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test that sundry operations internal to the region server are traced as expected.
|
||||
*/
|
||||
@Category({ MediumTests.class, RegionServerTests.class })
|
||||
public class TestServerInternalsTracing {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestServerInternalsTracing.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestServerInternalsTracing.class);
|
||||
|
||||
private static final String NO_PARENT_ID = "0000000000000000";
|
||||
private static List<SpanData> spans;
|
||||
|
||||
/**
|
||||
* Wait for the underlying cluster to come up -- defined by meta being available.
|
||||
*/
|
||||
private static class Setup extends ExternalResource {
|
||||
private final Supplier<HBaseTestingUtility> testingUtilSupplier;
|
||||
|
||||
public Setup(final Supplier<HBaseTestingUtility> testingUtilSupplier) {
|
||||
this.testingUtilSupplier = testingUtilSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void before() throws Throwable {
|
||||
final HBaseTestingUtility testingUtil = testingUtilSupplier.get();
|
||||
testingUtil.waitTableAvailable(TableName.META_TABLE_NAME);
|
||||
}
|
||||
}
|
||||
|
||||
private static class Noop extends Statement {
|
||||
@Override
|
||||
public void evaluate() throws Throwable {
|
||||
}
|
||||
}
|
||||
|
||||
@ClassRule
|
||||
public static TestRule classRule = (base, description) -> new Statement() {
|
||||
@Override
|
||||
public void evaluate() throws Throwable {
|
||||
// setup and tear down the cluster, collecting all the spans produced in the process.
|
||||
final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
|
||||
final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
|
||||
final Setup setup = new Setup(miniClusterRule::getTestingUtility);
|
||||
final TestRule clusterRule =
|
||||
RuleChain.outerRule(otelClassRule).around(miniClusterRule).around(setup);
|
||||
clusterRule.apply(new Noop(), description).evaluate();
|
||||
spans = otelClassRule.getSpans();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
StringTraceRenderer renderer = new StringTraceRenderer(spans);
|
||||
renderer.render(LOG::debug);
|
||||
}
|
||||
base.evaluate();
|
||||
}
|
||||
};
|
||||
|
||||
@Test
|
||||
public void testHMasterConstructor() {
|
||||
final Matcher<SpanData> masterConstructorMatcher = allOf(hasName("HMaster.cxtor"),
|
||||
hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the HMaster constructor.", spans,
|
||||
hasItem(masterConstructorMatcher));
|
||||
final SpanData masterConstructorSpan = spans.stream().filter(masterConstructorMatcher::matches)
|
||||
.findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the HMaster constructor span should show zookeeper interaction.", spans, hasItem(
|
||||
allOf(hasName(startsWith("RecoverableZookeeper.")), hasParentSpanId(masterConstructorSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHMasterBecomeActiveMaster() {
|
||||
final Matcher<SpanData> masterBecomeActiveMasterMatcher =
|
||||
allOf(hasName("HMaster.becomeActiveMaster"), hasParentSpanId(NO_PARENT_ID),
|
||||
hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the HMaster.becomeActiveMaster.", spans,
|
||||
hasItem(masterBecomeActiveMasterMatcher));
|
||||
final SpanData masterBecomeActiveMasterSpan = spans.stream()
|
||||
.filter(masterBecomeActiveMasterMatcher::matches).findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the HMaster.becomeActiveMaster span should show zookeeper interaction.", spans,
|
||||
hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")),
|
||||
hasParentSpanId(masterBecomeActiveMasterSpan))));
|
||||
assertThat("the HMaster.becomeActiveMaster span should show Region interaction.", spans,
|
||||
hasItem(
|
||||
allOf(hasName(startsWith("Region.")), hasParentSpanId(masterBecomeActiveMasterSpan))));
|
||||
assertThat("the HMaster.becomeActiveMaster span should show RegionScanner interaction.", spans,
|
||||
hasItem(allOf(hasName(startsWith("RegionScanner.")),
|
||||
hasParentSpanId(masterBecomeActiveMasterSpan))));
|
||||
assertThat("the HMaster.becomeActiveMaster span should show hbase:meta interaction.", spans,
|
||||
hasItem(allOf(hasName(containsString("hbase:meta")),
|
||||
hasParentSpanId(masterBecomeActiveMasterSpan))));
|
||||
assertThat("the HMaster.becomeActiveMaster span should show WAL interaction.", spans,
|
||||
hasItem(allOf(hasName(startsWith("WAL.")), hasParentSpanId(masterBecomeActiveMasterSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZKWatcherHMaster() {
|
||||
final Matcher<SpanData> mZKWatcherMatcher = allOf(hasName(startsWith("ZKWatcher-master")),
|
||||
hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the ZKWatcher running in the HMaster.", spans,
|
||||
hasItem(mZKWatcherMatcher));
|
||||
final SpanData mZKWatcherSpan =
|
||||
spans.stream().filter(mZKWatcherMatcher::matches).findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the ZKWatcher running in the HMaster span should invoke processEvent.", spans,
|
||||
hasItem(allOf(hasName(containsString("processEvent")), hasParentSpanId(mZKWatcherSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHMasterShutdown() {
|
||||
final Matcher<SpanData> masterShutdownMatcher = allOf(hasName("HMaster.shutdown"),
|
||||
hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the HMaster.shutdown.", spans,
|
||||
hasItem(masterShutdownMatcher));
|
||||
final SpanData masterShutdownSpan = spans.stream().filter(masterShutdownMatcher::matches)
|
||||
.findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the HMaster.shutdown span should show zookeeper interaction.", spans, hasItem(
|
||||
allOf(hasName(startsWith("RecoverableZookeeper.")), hasParentSpanId(masterShutdownSpan))));
|
||||
assertThat(
|
||||
"the HMaster.shutdown span should show ShortCircuitingClusterConnection interaction.", spans,
|
||||
hasItem(allOf(hasName(startsWith("ShortCircuitingClusterConnection.")),
|
||||
hasParentSpanId(masterShutdownSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHMasterExitingMainLoop() {
|
||||
final Matcher<SpanData> masterExitingMainLoopMatcher =
|
||||
allOf(hasName("HMaster exiting main loop"), hasParentSpanId(NO_PARENT_ID),
|
||||
hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the HMaster exiting main loop.", spans,
|
||||
hasItem(masterExitingMainLoopMatcher));
|
||||
final SpanData masterExitingMainLoopSpan = spans.stream()
|
||||
.filter(masterExitingMainLoopMatcher::matches).findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the HMaster exiting main loop span should show HTable interaction.", spans,
|
||||
hasItem(allOf(hasName(startsWith("HTable.")), hasParentSpanId(masterExitingMainLoopSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTryRegionServerReport() {
|
||||
final Matcher<SpanData> tryRegionServerReportMatcher =
|
||||
allOf(hasName("HRegionServer.tryRegionServerReport"), hasParentSpanId(NO_PARENT_ID),
|
||||
hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span for the region server sending a report.", spans,
|
||||
hasItem(tryRegionServerReportMatcher));
|
||||
final SpanData tryRegionServerReportSpan = spans.stream()
|
||||
.filter(tryRegionServerReportMatcher::matches).findAny().orElseThrow(AssertionError::new);
|
||||
assertThat(
|
||||
"the region server report span should have an invocation of the RegionServerReport RPC.",
|
||||
spans, hasItem(allOf(hasName(endsWith("RegionServerStatusService/RegionServerReport")),
|
||||
hasParentSpanId(tryRegionServerReportSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHRegionServerStartup() {
|
||||
final Matcher<SpanData> regionServerStartupMatcher = allOf(hasName("HRegionServer.startup"),
|
||||
hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the HRegionServer startup procedure.", spans,
|
||||
hasItem(regionServerStartupMatcher));
|
||||
final SpanData regionServerStartupSpan = spans.stream()
|
||||
.filter(regionServerStartupMatcher::matches).findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the HRegionServer startup procedure span should show zookeeper interaction.", spans,
|
||||
hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")),
|
||||
hasParentSpanId(regionServerStartupSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHRegionServerConstructor() {
|
||||
final Matcher<SpanData> rsConstructorMatcher = allOf(hasName("HRegionServer.cxtor"),
|
||||
hasParentSpanId(NO_PARENT_ID), hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the HRegionServer constructor.", spans,
|
||||
hasItem(rsConstructorMatcher));
|
||||
final SpanData rsConstructorSpan = spans.stream().filter(rsConstructorMatcher::matches)
|
||||
.findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the HRegionServer constructor span should show zookeeper interaction.", spans,
|
||||
hasItem(
|
||||
allOf(hasName(startsWith("RecoverableZookeeper.")), hasParentSpanId(rsConstructorSpan))));
|
||||
assertThat("the HRegionServer constructor span should invoke the MasterAddressTracker.", spans,
|
||||
hasItem(
|
||||
allOf(hasName(startsWith("MasterAddressTracker.")), hasParentSpanId(rsConstructorSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHRegionServerPreRegistrationInitialization() {
|
||||
final Matcher<SpanData> rsPreRegistrationInitializationMatcher =
|
||||
allOf(hasName("HRegionServer.preRegistrationInitialization"), hasParentSpanId(NO_PARENT_ID),
|
||||
hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the HRegionServer preRegistrationInitialization.",
|
||||
spans, hasItem(rsPreRegistrationInitializationMatcher));
|
||||
final SpanData rsPreRegistrationInitializationSpan =
|
||||
spans.stream().filter(rsPreRegistrationInitializationMatcher::matches).findAny()
|
||||
.orElseThrow(AssertionError::new);
|
||||
assertThat(
|
||||
"the HRegionServer preRegistrationInitialization span should show zookeeper interaction.",
|
||||
spans, hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")),
|
||||
hasParentSpanId(rsPreRegistrationInitializationSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHRegionServerRegisterWithMaster() {
|
||||
final Matcher<SpanData> rsRegisterWithMasterMatcher =
|
||||
allOf(hasName("HRegionServer.registerWithMaster"), hasParentSpanId(NO_PARENT_ID),
|
||||
hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the HRegionServer registerWithMaster.", spans,
|
||||
hasItem(rsRegisterWithMasterMatcher));
|
||||
final SpanData rsRegisterWithMasterSpan = spans.stream()
|
||||
.filter(rsRegisterWithMasterMatcher::matches).findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the HRegionServer registerWithMaster span should show zookeeper interaction.",
|
||||
spans, hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")),
|
||||
hasParentSpanId(rsRegisterWithMasterSpan))));
|
||||
assertThat(
|
||||
"the HRegionServer registerWithMaster span should have an invocation of the"
|
||||
+ " RegionServerStartup RPC.",
|
||||
spans, hasItem(allOf(hasName(endsWith("RegionServerStatusService/RegionServerStartup")),
|
||||
hasParentSpanId(rsRegisterWithMasterSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZKWatcherRegionServer() {
|
||||
final Matcher<SpanData> rsZKWatcherMatcher =
|
||||
allOf(hasName(startsWith("ZKWatcher-regionserver")), hasParentSpanId(NO_PARENT_ID),
|
||||
hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the ZKWatcher running in the HRegionServer.", spans,
|
||||
hasItem(rsZKWatcherMatcher));
|
||||
final SpanData rsZKWatcherSpan =
|
||||
spans.stream().filter(rsZKWatcherMatcher::matches).findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the ZKWatcher running in the HRegionServer span should invoke processEvent.", spans,
|
||||
hasItem(allOf(hasName(containsString("processEvent")), hasParentSpanId(rsZKWatcherSpan))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHRegionServerExitingMainLoop() {
|
||||
final Matcher<SpanData> rsExitingMainLoopMatcher =
|
||||
allOf(hasName("HRegionServer exiting main loop"), hasParentSpanId(NO_PARENT_ID),
|
||||
hasStatusWithCode(isOneOf(StatusCode.OK, StatusCode.ERROR)));
|
||||
assertThat("there should be a span from the HRegionServer exiting main loop.", spans,
|
||||
hasItem(rsExitingMainLoopMatcher));
|
||||
final SpanData rsExitingMainLoopSpan = spans.stream().filter(rsExitingMainLoopMatcher::matches)
|
||||
.findAny().orElseThrow(AssertionError::new);
|
||||
assertThat("the HRegionServer exiting main loop span should show zookeeper interaction.", spans,
|
||||
hasItem(allOf(hasName(startsWith("RecoverableZookeeper.")),
|
||||
hasParentSpanId(rsExitingMainLoopSpan))));
|
||||
assertThat(
|
||||
"the HRegionServer exiting main loop span should show "
|
||||
+ "ShortCircuitingClusterConnection interaction.",
|
||||
spans, hasItem(allOf(hasName(startsWith("ShortCircuitingClusterConnection.")),
|
||||
hasParentSpanId(rsExitingMainLoopSpan))));
|
||||
assertThat("the HRegionServer exiting main loop span should invoke CloseMetaHandler.", spans,
|
||||
hasItem(allOf(hasName("CloseMetaHandler"), hasParentSpanId(rsExitingMainLoopSpan))));
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Abortable;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
@ -71,11 +72,13 @@ public class MasterAddressTracker extends ZKNodeTracker {
|
|||
}
|
||||
|
||||
private void loadBackupMasters() {
|
||||
try {
|
||||
backupMasters = Collections.unmodifiableList(getBackupMastersAndRenewWatch(watcher));
|
||||
} catch (InterruptedIOException e) {
|
||||
abortable.abort("Unexpected exception handling nodeChildrenChanged event", e);
|
||||
}
|
||||
TraceUtil.trace(() -> {
|
||||
try {
|
||||
backupMasters = Collections.unmodifiableList(getBackupMastersAndRenewWatch(watcher));
|
||||
} catch (InterruptedIOException e) {
|
||||
abortable.abort("Unexpected exception handling nodeChildrenChanged event", e);
|
||||
}
|
||||
}, "MasterAddressTracker.loadBackupMasters");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
|
@ -201,14 +202,15 @@ public class RecoverableZooKeeper {
|
|||
* throw NoNodeException if the path does not exist.
|
||||
*/
|
||||
public void delete(String path, int version) throws InterruptedException, KeeperException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.delete").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
boolean isRetry = false; // False for first attempt, true for all retries.
|
||||
while (true) {
|
||||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
checkZk().delete(path, version);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return;
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
|
@ -216,18 +218,22 @@ public class RecoverableZooKeeper {
|
|||
if (isRetry) {
|
||||
LOG.debug(
|
||||
"Node " + path + " already deleted. Assuming a " + "previous attempt succeeded.");
|
||||
span.setStatus(StatusCode.OK);
|
||||
return;
|
||||
}
|
||||
LOG.debug("Node {} already deleted, retry={}", path, isRetry);
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
|
||||
case CONNECTIONLOSS:
|
||||
case OPERATIONTIMEOUT:
|
||||
case REQUESTTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "delete");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -244,23 +250,26 @@ public class RecoverableZooKeeper {
|
|||
* @return A Stat instance
|
||||
*/
|
||||
public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
Stat nodeStat = checkZk().exists(path, watcher);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return nodeStat;
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
case OPERATIONTIMEOUT:
|
||||
case REQUESTTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "exists");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -276,24 +285,28 @@ public class RecoverableZooKeeper {
|
|||
* @return A Stat instance
|
||||
*/
|
||||
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.exists").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
Span span = TraceUtil.createSpan("RecoverableZookeeper.exists");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
Stat nodeStat = checkZk().exists(path, watch);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return nodeStat;
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "exists");
|
||||
break;
|
||||
case OPERATIONTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "exists");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -319,24 +332,26 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public List<String> getChildren(String path, Watcher watcher)
|
||||
throws KeeperException, InterruptedException {
|
||||
Span span =
|
||||
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
List<String> children = checkZk().getChildren(path, watcher);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return children;
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
case OPERATIONTIMEOUT:
|
||||
case REQUESTTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "getChildren");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -353,25 +368,28 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public List<String> getChildren(String path, boolean watch)
|
||||
throws KeeperException, InterruptedException {
|
||||
Span span =
|
||||
TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getChildren").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
List<String> children = checkZk().getChildren(path, watch);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return children;
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "getChildren");
|
||||
break;
|
||||
case OPERATIONTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "getChildren");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -387,23 +405,26 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public byte[] getData(String path, Watcher watcher, Stat stat)
|
||||
throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
byte[] revData = checkZk().getData(path, watcher, stat);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return ZKMetadata.removeMetaData(revData);
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
case OPERATIONTIMEOUT:
|
||||
case REQUESTTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "getData");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -426,17 +447,21 @@ public class RecoverableZooKeeper {
|
|||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
byte[] revData = checkZk().getData(path, watch, stat);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return ZKMetadata.removeMetaData(revData);
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "getData");
|
||||
break;
|
||||
case OPERATIONTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "getData");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -455,8 +480,8 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public Stat setData(String path, byte[] data, int version)
|
||||
throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setData").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
byte[] newData = ZKMetadata.appendMetaData(id, data);
|
||||
boolean isRetry = false;
|
||||
|
@ -465,12 +490,14 @@ public class RecoverableZooKeeper {
|
|||
try {
|
||||
startTime = EnvironmentEdgeManager.currentTime();
|
||||
Stat nodeStat = checkZk().setData(path, newData, version);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return nodeStat;
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
case OPERATIONTIMEOUT:
|
||||
case REQUESTTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "setData");
|
||||
break;
|
||||
case BADVERSION:
|
||||
|
@ -481,15 +508,18 @@ public class RecoverableZooKeeper {
|
|||
byte[] revData = checkZk().getData(path, false, stat);
|
||||
if (Bytes.compareTo(revData, newData) == 0) {
|
||||
// the bad version is caused by previous successful setData
|
||||
span.setStatus(StatusCode.OK);
|
||||
return stat;
|
||||
}
|
||||
} catch (KeeperException keeperException) {
|
||||
// the ZK is not reliable at this moment. just throwing exception
|
||||
TraceUtil.setError(span, keeperException);
|
||||
throw keeperException;
|
||||
}
|
||||
}
|
||||
// throw other exceptions and verified bad version exceptions
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -506,23 +536,26 @@ public class RecoverableZooKeeper {
|
|||
* @return list of ACLs
|
||||
*/
|
||||
public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getAcl").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
List<ACL> nodeACL = checkZk().getACL(path, stat);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return nodeACL;
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
case OPERATIONTIMEOUT:
|
||||
case REQUESTTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "getAcl");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -539,22 +572,25 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public Stat setAcl(String path, List<ACL> acls, int version)
|
||||
throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.setAcl").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
Stat nodeStat = checkZk().setACL(path, acls, version);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return nodeStat;
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
case OPERATIONTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "setAcl");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -578,20 +614,25 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
|
||||
throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.create").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final Span span = TraceUtil.createSpan("RecoverableZookeeper.create");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
byte[] newData = ZKMetadata.appendMetaData(id, data);
|
||||
switch (createMode) {
|
||||
case EPHEMERAL:
|
||||
case PERSISTENT:
|
||||
span.setStatus(StatusCode.OK);
|
||||
return createNonSequential(path, newData, acl, createMode);
|
||||
|
||||
case EPHEMERAL_SEQUENTIAL:
|
||||
case PERSISTENT_SEQUENTIAL:
|
||||
span.setStatus(StatusCode.OK);
|
||||
return createSequential(path, newData, acl, createMode);
|
||||
|
||||
default:
|
||||
throw new IllegalArgumentException("Unrecognized CreateMode: " + createMode);
|
||||
final IllegalArgumentException e =
|
||||
new IllegalArgumentException("Unrecognized CreateMode: " + createMode);
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
|
@ -709,24 +750,27 @@ public class RecoverableZooKeeper {
|
|||
* Run multiple operations in a transactional manner. Retry before throwing exception
|
||||
*/
|
||||
public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.multi").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi");
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
Iterable<Op> multiOps = prepareZKMulti(ops);
|
||||
while (true) {
|
||||
try {
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
List<OpResult> opResults = checkZk().multi(multiOps);
|
||||
span.setStatus(StatusCode.OK);
|
||||
return opResults;
|
||||
} catch (KeeperException e) {
|
||||
switch (e.code()) {
|
||||
case CONNECTIONLOSS:
|
||||
case OPERATIONTIMEOUT:
|
||||
case REQUESTTIMEOUT:
|
||||
TraceUtil.setError(span, e);
|
||||
retryOrThrow(retryCounter, e, "multi");
|
||||
break;
|
||||
|
||||
default:
|
||||
TraceUtil.setError(span, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.AuthUtil;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.security.Superusers;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -552,45 +553,47 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
|
|||
}
|
||||
|
||||
private void processEvent(WatchedEvent event) {
|
||||
switch (event.getType()) {
|
||||
// If event type is NONE, this is a connection status change
|
||||
case None: {
|
||||
connectionEvent(event);
|
||||
break;
|
||||
}
|
||||
|
||||
// Otherwise pass along to the listeners
|
||||
case NodeCreated: {
|
||||
for (ZKListener listener : listeners) {
|
||||
listener.nodeCreated(event.getPath());
|
||||
TraceUtil.trace(() -> {
|
||||
switch (event.getType()) {
|
||||
// If event type is NONE, this is a connection status change
|
||||
case None: {
|
||||
connectionEvent(event);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case NodeDeleted: {
|
||||
for (ZKListener listener : listeners) {
|
||||
listener.nodeDeleted(event.getPath());
|
||||
// Otherwise pass along to the listeners
|
||||
case NodeCreated: {
|
||||
for (ZKListener listener : listeners) {
|
||||
listener.nodeCreated(event.getPath());
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case NodeDataChanged: {
|
||||
for (ZKListener listener : listeners) {
|
||||
listener.nodeDataChanged(event.getPath());
|
||||
case NodeDeleted: {
|
||||
for (ZKListener listener : listeners) {
|
||||
listener.nodeDeleted(event.getPath());
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case NodeChildrenChanged: {
|
||||
for (ZKListener listener : listeners) {
|
||||
listener.nodeChildrenChanged(event.getPath());
|
||||
case NodeDataChanged: {
|
||||
for (ZKListener listener : listeners) {
|
||||
listener.nodeDataChanged(event.getPath());
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
|
||||
case NodeChildrenChanged: {
|
||||
for (ZKListener listener : listeners) {
|
||||
listener.nodeChildrenChanged(event.getPath());
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
LOG.error("Invalid event of type {} received for path {}. Ignoring.", event.getState(),
|
||||
event.getPath());
|
||||
}
|
||||
default:
|
||||
LOG.error("Invalid event of type {} received for path {}. Ignoring.", event.getState(),
|
||||
event.getPath());
|
||||
}
|
||||
}, "ZKWatcher.processEvent: " + event.getType() + " " + event.getPath());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -602,7 +605,8 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
|
|||
public void process(WatchedEvent event) {
|
||||
LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state="
|
||||
+ event.getState() + ", " + "path=" + event.getPath()));
|
||||
zkEventProcessor.submit(() -> processEvent(event));
|
||||
final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier;
|
||||
zkEventProcessor.submit(TraceUtil.tracedRunnable(() -> processEvent(event), spanName));
|
||||
}
|
||||
|
||||
// Connection management
|
||||
|
|
Loading…
Reference in New Issue