From 3ff51487b7cbf26f95ef414f3b5d505a8620804f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 12 Jul 2023 09:34:28 -0700 Subject: [PATCH] Add ZooKeeper connection state alerts and metrics. (#14333) * Add ZooKeeper connection state alerts and metrics. - New metric "zk/connected" is an indicator showing 1 when connected, 0 when disconnected. - New metric "zk/disconnected/time" measures time spent disconnected. - New alert when Curator connection state enters LOST or SUSPENDED. * Use right GuardedBy. * Test fixes, coverage. * Adjustment. * Fix tests. * Fix ITs. * Improved injection. * Adjust metric name, add tests. --- docs/operations/metrics.md | 9 ++ .../kafka/supervisor/KafkaSupervisorTest.java | 2 +- .../supervisor/KinesisSupervisorTest.java | 2 +- .../druid/testsEx/config/Initializer.java | 10 +- .../java/util/emitter/EmittingLogger.java | 20 +-- .../util/emitter/service/AlertBuilder.java | 24 ++- .../emitter/service/AlertBuilderTest.java | 54 +++++++ .../testing/junit/LoggerCaptureRule.java | 45 +++++- .../apache/druid/curator/CuratorModule.java | 81 +++++++--- .../curator/DruidConnectionStateListener.java | 103 ++++++++++++ .../druid/curator/CuratorModuleTest.java | 15 +- .../DruidConnectionStateListenerTest.java | 150 ++++++++++++++++++ .../ExceptionCapturingServiceEmitter.java | 12 +- 13 files changed, 464 insertions(+), 63 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java create mode 100644 server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java create mode 100644 server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index ecadf53c4cd..61400a5379c 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -369,6 +369,15 @@ For more information, see [Enabling Metrics](../configuration/index.md#enabling- |`jvm/gc/count`|Garbage collection count|`gcName` (cms/g1/parallel/etc.), `gcGen` (old/young)|Varies| |`jvm/gc/cpu`|Count of CPU time in Nanoseconds spent on garbage collection. Note: `jvm/gc/cpu` represents the total time over multiple GC cycles; divide by `jvm/gc/count` to get the mean GC time per cycle.|`gcName`, `gcGen`|Sum of `jvm/gc/cpu` should be within 10-30% of sum of `jvm/cpu/total`, depending on the GC algorithm used (reported by [`JvmCpuMonitor`](../configuration/index.md#enabling-metrics)). | +### ZooKeeper + +These metrics are available unless `druid.zk.service.enabled = false`. + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`zk/connected`|Indicator of connection status. `1` for connected, `0` for disconnected. Emitted once per monitor period.|None|1| +|`zk/reconnect/time`|Amount of time, in milliseconds, that a server was disconnected from ZooKeeper before reconnecting. Emitted on reconnection. Not emitted if connection to ZooKeeper is permanently lost, because in this case, there is no reconnection.|None|Not present| + ### EventReceiverFirehose The following metric is only available if the `EventReceiverFirehoseMonitor` module is included. diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 97297c085e2..47875e106f3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -3440,7 +3440,7 @@ public class KafkaSupervisorTest extends EasyMockSupport "Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]", serviceEmitter.getExceptionMessage() ); - Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); + Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass()); } @Test diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 19caa96ead9..37691b1a7e5 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -3461,7 +3461,7 @@ public class KinesisSupervisorTest extends EasyMockSupport "Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]", serviceEmitter.getExceptionMessage() ); - Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); + Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass()); } @Test diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java index 80d1dec0e66..7a2eae93e16 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java @@ -168,7 +168,11 @@ public class Initializer JsonConfigProvider.bind(binder, MetadataStorageTablesConfig.PROPERTY_BASE, MetadataStorageTablesConfig.class); // Build from properties provided in the config - JsonConfigProvider.bind(binder, MetadataStorageConnectorConfig.PROPERTY_BASE, MetadataStorageConnectorConfig.class); + JsonConfigProvider.bind( + binder, + MetadataStorageConnectorConfig.PROPERTY_BASE, + MetadataStorageConnectorConfig.class + ); } @Provides @@ -327,7 +331,7 @@ public class Initializer *

* The builder registers {@code DruidNodeDiscoveryProvider} by default: add any * test-specific instances as needed. - */ + */ public Builder eagerInstance(Class theClass) { this.eagerCreation.add(theClass); @@ -343,7 +347,7 @@ public class Initializer return this; } - public Builder modules(Module...modules) + public Builder modules(Module... modules) { return modules(Arrays.asList(modules)); } diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java index 7ad5d1091de..6531c6bd3a8 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java @@ -29,17 +29,11 @@ import org.apache.druid.java.util.emitter.service.AlertBuilder; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import javax.annotation.Nullable; -import java.io.PrintWriter; -import java.io.StringWriter; /** */ public class EmittingLogger extends Logger { - public static final String EXCEPTION_TYPE_KEY = "exceptionType"; - public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage"; - public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace"; - private static volatile ServiceEmitter emitter = null; private final String className; @@ -93,19 +87,8 @@ public class EmittingLogger extends Logger throw e; } - final AlertBuilder retVal = new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter) + return new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter) .addData("class", className); - - if (t != null) { - final StringWriter trace = new StringWriter(); - final PrintWriter pw = new PrintWriter(trace); - t.printStackTrace(pw); - retVal.addData("exceptionType", t.getClass()); - retVal.addData("exceptionMessage", t.getMessage()); - retVal.addData("exceptionStackTrace", trace.toString()); - } - - return retVal; } public class EmittingAlertBuilder extends AlertBuilder @@ -118,6 +101,7 @@ public class EmittingLogger extends Logger { super(description, emitter); this.t = t; + addThrowable(t); } @Override diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java index 9938e7cb7ae..352272d67bc 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java @@ -24,12 +24,20 @@ import com.google.common.collect.Maps; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; +import javax.annotation.Nullable; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; /** -*/ + * + */ public class AlertBuilder extends ServiceEventBuilder { + public static final String EXCEPTION_TYPE_KEY = "exceptionType"; + public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage"; + public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace"; + protected final Map dataMap = Maps.newLinkedHashMap(); protected final String description; protected final ServiceEmitter emitter; @@ -67,6 +75,20 @@ public class AlertBuilder extends ServiceEventBuilder return this; } + public AlertBuilder addThrowable(@Nullable final Throwable t) + { + if (t != null) { + final StringWriter trace = new StringWriter(); + final PrintWriter pw = new PrintWriter(trace); + t.printStackTrace(pw); + addData(EXCEPTION_TYPE_KEY, t.getClass().getName()); + addData(EXCEPTION_MESSAGE_KEY, t.getMessage()); + addData(EXCEPTION_STACK_TRACE_KEY, trace.toString()); + } + + return this; + } + public AlertBuilder severity(AlertEvent.Severity severity) { this.severity = severity; diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java new file mode 100644 index 00000000000..275dc8be703 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.service; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class AlertBuilderTest +{ + @Test + public void testAlertBuilder() + { + final AlertEvent alertEvent = + AlertBuilder.create("alert[%s]", "oops") + .addData(ImmutableMap.of("foo", "bar")) + .addData(ImmutableMap.of("baz", "qux")) + .addThrowable(new RuntimeException("an exception!")) + .build("druid/test", "example.com"); + + final EventMap alertMap = alertEvent.toMap(); + + Assert.assertEquals("alerts", alertMap.get("feed")); + Assert.assertEquals("alert[oops]", alertMap.get("description")); + Assert.assertEquals("druid/test", alertMap.get("service")); + Assert.assertEquals("example.com", alertMap.get("host")); + + final Map dataMap = (Map) alertMap.get("data"); + Assert.assertEquals("java.lang.RuntimeException", dataMap.get("exceptionType")); + Assert.assertEquals("an exception!", dataMap.get("exceptionMessage")); + Assert.assertEquals("bar", dataMap.get("foo")); + Assert.assertEquals("qux", dataMap.get("baz")); + } +} diff --git a/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java b/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java index ae7a1dfde05..70c566b3f0e 100644 --- a/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java +++ b/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java @@ -19,6 +19,8 @@ package org.apache.druid.testing.junit; +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LogEvent; @@ -28,8 +30,8 @@ import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; import org.junit.rules.ExternalResource; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; /** * JUnit rule to capture a class's logger output to an in-memory buffer to allow verification of log messages in tests. @@ -73,6 +75,14 @@ public class LoggerCaptureRule extends ExternalResource inMemoryAppender.clearLogEvents(); } + /** + * Wait for the captured + */ + public void awaitLogEvents() throws InterruptedException + { + inMemoryAppender.awaitLogEvents(); + } + private static class InMemoryAppender extends AbstractAppender { static final String NAME = InMemoryAppender.class.getName(); @@ -80,32 +90,51 @@ public class LoggerCaptureRule extends ExternalResource private final String targetLoggerName; // logEvents has concurrent iteration and modification in CuratorModuleTest::exitsJvmWhenMaxRetriesExceeded(), needs to be thread safe - private final CopyOnWriteArrayList logEvents; + @GuardedBy("logEvents") + private final List logEvents; InMemoryAppender(Class targetClass) { super(NAME, null, null); targetLoggerName = targetClass.getName(); - logEvents = new CopyOnWriteArrayList<>(); + logEvents = new ArrayList<>(); } @Override public void append(LogEvent logEvent) { - if (logEvent.getLoggerName().equals(targetLoggerName)) { - logEvents.add(logEvent); + synchronized (logEvents) { + if (logEvent.getLoggerName().equals(targetLoggerName)) { + logEvents.add(logEvent); + logEvents.notifyAll(); + } } } List getLogEvents() { - return logEvents; + synchronized (logEvents) { + return ImmutableList.copyOf(logEvents); + } } void clearLogEvents() { - logEvents.clear(); + synchronized (logEvents) { + logEvents.clear(); + } + } + + /** + * Wait for "logEvents" to be nonempty. If it is already nonempty, return immediately. + */ + void awaitLogEvents() throws InterruptedException + { + synchronized (logEvents) { + while (logEvents.isEmpty()) { + logEvents.wait(); + } + } } } } - diff --git a/server/src/main/java/org/apache/druid/curator/CuratorModule.java b/server/src/main/java/org/apache/druid/curator/CuratorModule.java index 201c96bfdff..a7e76af474a 100644 --- a/server/src/main/java/org/apache/druid/curator/CuratorModule.java +++ b/server/src/main/java/org/apache/druid/curator/CuratorModule.java @@ -36,6 +36,9 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.AlertBuilder; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.server.metrics.MetricsModule; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -58,7 +61,6 @@ public class CuratorModule implements Module } /** - * * @param haltOnFailedStart set to true if the JVM needs to be halted within 30 seconds of failed initialization * due to unhandled curator exceptions. */ @@ -72,6 +74,7 @@ public class CuratorModule implements Module { JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, ZkEnablementConfig.class); JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, CuratorConfig.class); + MetricsModule.register(binder, DruidConnectionStateListener.class); } /** @@ -88,7 +91,8 @@ public class CuratorModule implements Module ); } - RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, config.getMaxZkRetries()); + final RetryPolicy retryPolicy = + new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, config.getMaxZkRetries()); return builder .ensembleProvider(new FixedEnsembleProvider(config.getZkHosts())) @@ -105,7 +109,13 @@ public class CuratorModule implements Module */ @Provides @LazySingleton - public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, CuratorConfig config, Lifecycle lifecycle) + public CuratorFramework makeCurator( + final ZkEnablementConfig zkEnablementConfig, + final CuratorConfig config, + final DruidConnectionStateListener connectionStateListener, + final ServiceEmitter emitter, + final Lifecycle lifecycle + ) { if (!zkEnablementConfig.isEnabled()) { throw new RuntimeException("Zookeeper is disabled, cannot create CuratorFramework."); @@ -113,7 +123,34 @@ public class CuratorModule implements Module final CuratorFramework framework = createCurator(config); + framework.getConnectionStateListenable().addListener(connectionStateListener); + addUnhandledErrorListener(framework, emitter, lifecycle); + addLifecycleHandler(framework, lifecycle); + + return framework; + } + + /** + * Provide an instance of {@link DruidConnectionStateListener} for monitoring connection state. + */ + @Provides + @LazySingleton + public DruidConnectionStateListener makeConnectionStateListener(final ServiceEmitter emitter) + { + return new DruidConnectionStateListener(emitter); + } + + /** + * Add unhandled error listener that shuts down the JVM. + */ + private void addUnhandledErrorListener( + final CuratorFramework framework, + final ServiceEmitter emitter, + final Lifecycle lifecycle + ) + { framework.getUnhandledErrorListenable().addListener((message, e) -> { + emitter.emit(AlertBuilder.create("Unhandled Curator error").addThrowable(e)); log.error(e, "Unhandled error in Curator, stopping server."); if (haltOnFailedStart) { @@ -140,7 +177,13 @@ public class CuratorModule implements Module shutdown(lifecycle); }); + } + /** + * Add unhandled error listener that shuts down the JVM. + */ + private void addLifecycleHandler(final CuratorFramework framework, final Lifecycle lifecycle) + { lifecycle.addHandler( new Lifecycle.Handler() { @@ -159,23 +202,6 @@ public class CuratorModule implements Module } } ); - - return framework; - } - - static class SecuredACLProvider implements ACLProvider - { - @Override - public List getDefaultAcl() - { - return ZooDefs.Ids.CREATOR_ALL_ACL; - } - - @Override - public List getAclForPath(String path) - { - return ZooDefs.Ids.CREATOR_ALL_ACL; - } } private void shutdown(Lifecycle lifecycle) @@ -191,4 +217,19 @@ public class CuratorModule implements Module System.exit(1); } } + + private static class SecuredACLProvider implements ACLProvider + { + @Override + public List getDefaultAcl() + { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(String path) + { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + } } diff --git a/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java new file mode 100644 index 00000000000..b0ba274d501 --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator; + +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.druid.java.util.emitter.service.AlertBuilder; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.AbstractMonitor; + +/** + * Curator {@link ConnectionStateListener} that uses a {@link ServiceEmitter} to send alerts on ZK connection loss, + * and emit metrics about ZK connection status. + */ +public class DruidConnectionStateListener extends AbstractMonitor implements ConnectionStateListener +{ + private static final String METRIC_IS_CONNECTED = "zk/connected"; + private static final String METRIC_RECONNECT_TIME = "zk/reconnect/time"; + private static final int NIL = -1; + + private final ServiceEmitter emitter; + + /** + * Current connection state. + */ + @GuardedBy("this") + private ConnectionState currentState; + + /** + * Time given by {@link System#currentTimeMillis()} at last disconnect. + */ + @GuardedBy("this") + private long lastDisconnectTime = NIL; + + public DruidConnectionStateListener(final ServiceEmitter emitter) + { + this.emitter = emitter; + } + + @Override + public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState) + { + if (newState.isConnected()) { + final long disconnectDuration; + + synchronized (this) { + if (lastDisconnectTime != NIL) { + disconnectDuration = Math.max(0, System.currentTimeMillis() - lastDisconnectTime); + } else { + disconnectDuration = NIL; + } + + currentState = newState; + lastDisconnectTime = NIL; + } + + if (disconnectDuration != NIL) { + emitter.emit(ServiceMetricEvent.builder().build(METRIC_RECONNECT_TIME, disconnectDuration)); + } + } else { + synchronized (this) { + currentState = newState; + lastDisconnectTime = Math.max(lastDisconnectTime, System.currentTimeMillis()); + } + + emitter.emit(AlertBuilder.create("ZooKeeper connection[%s]", newState)); + } + } + + public boolean isConnected() + { + synchronized (this) { + return currentState != null && currentState.isConnected(); + } + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + emitter.emit(ServiceMetricEvent.builder().build(METRIC_IS_CONNECTED, isConnected() ? 1 : 0)); + return true; + } +} diff --git a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java index 5b88cb59f43..46257efc055 100644 --- a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java +++ b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java @@ -27,6 +27,8 @@ import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.testing.junit.LoggerCaptureRule; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.LogEvent; @@ -69,23 +71,25 @@ public final class CuratorModuleTest Assert.assertEquals(CuratorModule.MAX_SLEEP_TIME_MS, retryPolicy.getMaxSleepTimeMs()); } - @Test + @Test(timeout = 60_000L) public void exitsJvmWhenMaxRetriesExceeded() throws Exception { Properties props = new Properties(); props.setProperty(CURATOR_CONNECTION_TIMEOUT_MS_KEY, "0"); Injector injector = newInjector(props); - CuratorFramework curatorFramework = createCuratorFramework(injector, 0); - curatorFramework.start(); - exit.expectSystemExitWithStatus(1); logger.clearLogEvents(); + exit.expectSystemExitWithStatus(1); // This will result in a curator unhandled error since the connection timeout is 0 and retries are disabled + CuratorFramework curatorFramework = createCuratorFramework(injector, 0); + curatorFramework.start(); curatorFramework.create().inBackground().forPath("/foo"); // org.apache.curator.framework.impl.CuratorFrameworkImpl logs "Background retry gave up" unhandled error twice + logger.awaitLogEvents(); List loggingEvents = logger.getLogEvents(); + Assert.assertTrue( "Logging events: " + loggingEvents, loggingEvents.stream() @@ -121,8 +125,9 @@ public final class CuratorModuleTest .add( new LifecycleModule(), new CuratorModule(false), + binder -> binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class), binder -> binder.bind(Properties.class).toInstance(props) - ) + ) .build(); } diff --git a/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java new file mode 100644 index 00000000000..b173015959d --- /dev/null +++ b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class DruidConnectionStateListenerTest +{ + private TestEmitter emitter; + private DruidConnectionStateListener listener; + + @Before + public void setUp() + { + emitter = new TestEmitter(); + listener = new DruidConnectionStateListener(emitter); + } + + @Test + public void test_isConnected() + { + Assert.assertFalse(listener.isConnected()); + + listener.stateChanged(null, ConnectionState.CONNECTED); + Assert.assertTrue(listener.isConnected()); + + listener.stateChanged(null, ConnectionState.SUSPENDED); + Assert.assertFalse(listener.isConnected()); + + listener.stateChanged(null, ConnectionState.RECONNECTED); + Assert.assertTrue(listener.isConnected()); + + listener.stateChanged(null, ConnectionState.LOST); + Assert.assertFalse(listener.isConnected()); + } + + @Test + public void test_doMonitor_init() + { + listener.doMonitor(emitter); + Assert.assertEquals(1, emitter.getEvents().size()); + + final Map eventMap = emitter.getEvents().get(0).toMap(); + Assert.assertEquals("zk/connected", eventMap.get("metric")); + Assert.assertEquals(0, eventMap.get("value")); + } + + @Test + public void test_doMonitor_connected() + { + listener.stateChanged(null, ConnectionState.CONNECTED); + listener.doMonitor(emitter); + Assert.assertEquals(1, emitter.getEvents().size()); + + final Map eventMap = emitter.getEvents().get(0).toMap(); + Assert.assertEquals("zk/connected", eventMap.get("metric")); + Assert.assertEquals(1, eventMap.get("value")); + } + + @Test + public void test_doMonitor_notConnected() + { + listener.stateChanged(null, ConnectionState.SUSPENDED); + listener.doMonitor(emitter); + Assert.assertEquals(2, emitter.getEvents().size()); // 2 because stateChanged emitted an alert + + final Map eventMap = emitter.getEvents().get(1).toMap(); + Assert.assertEquals("zk/connected", eventMap.get("metric")); + Assert.assertEquals(0, eventMap.get("value")); + } + + @Test + public void test_suspendedAlert() + { + listener.stateChanged(null, ConnectionState.SUSPENDED); + Assert.assertEquals(1, emitter.getEvents().size()); + + final Map alertMap = emitter.getEvents().get(0).toMap(); + Assert.assertEquals("alerts", alertMap.get("feed")); + Assert.assertEquals("ZooKeeper connection[SUSPENDED]", alertMap.get("description")); + } + + @Test + public void test_reconnectedMetric() + { + listener.stateChanged(null, ConnectionState.SUSPENDED); + Assert.assertEquals(1, emitter.getEvents().size()); // the first stateChanged emits an alert + + listener.stateChanged(null, ConnectionState.RECONNECTED); + Assert.assertEquals(2, emitter.getEvents().size()); // the second stateChanged emits a metric + + final Map eventMap = emitter.getEvents().get(1).toMap(); + Assert.assertEquals("metrics", eventMap.get("feed")); + Assert.assertEquals("zk/reconnect/time", eventMap.get("metric")); + MatcherAssert.assertThat(eventMap.get("value"), CoreMatchers.instanceOf(Long.class)); + MatcherAssert.assertThat(((Number) eventMap.get("value")).longValue(), Matchers.greaterThanOrEqualTo(0L)); + } + + private static class TestEmitter extends NoopServiceEmitter + { + @GuardedBy("events") + private final List events = new ArrayList<>(); + + @Override + public void emit(Event event) + { + synchronized (events) { + events.add(event); + } + } + + public List getEvents() + { + synchronized (events) { + return ImmutableList.copyOf(events); + } + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java index c28744220f1..8a80b3df3e5 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java @@ -19,8 +19,8 @@ package org.apache.druid.server.metrics; -import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.AlertBuilder; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import javax.annotation.Nullable; @@ -28,7 +28,7 @@ import java.util.Map; public class ExceptionCapturingServiceEmitter extends ServiceEmitter { - private volatile Class exceptionClass; + private volatile String exceptionClass; private volatile String exceptionMessage; private volatile String stackTrace; @@ -42,10 +42,10 @@ public class ExceptionCapturingServiceEmitter extends ServiceEmitter { //noinspection unchecked final Map dataMap = (Map) event.toMap().get("data"); - final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY); + final String exceptionClass = (String) dataMap.get(AlertBuilder.EXCEPTION_TYPE_KEY); if (exceptionClass != null) { - final String exceptionMessage = (String) dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY); - final String stackTrace = (String) dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY); + final String exceptionMessage = (String) dataMap.get(AlertBuilder.EXCEPTION_MESSAGE_KEY); + final String stackTrace = (String) dataMap.get(AlertBuilder.EXCEPTION_STACK_TRACE_KEY); this.exceptionClass = exceptionClass; this.exceptionMessage = exceptionMessage; this.stackTrace = stackTrace; @@ -53,7 +53,7 @@ public class ExceptionCapturingServiceEmitter extends ServiceEmitter } @Nullable - public Class getExceptionClass() + public String getExceptionClass() { return exceptionClass; }